Kafka stream 加入一个特定的键作为输入
Kafka stream join with a specific key as input
我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我想加入的key和我把数据写入每个topic的key不一样
假设我们有这 3 个 Avro 文件:
报警:
{
"type" : "record",
"name" : "Alarm",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "alarm_id",
"type" : "string",
"doc" : "Unique identifier of the alarm."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "Unique identifier of the network element ID that produces the alarm."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the alarm was generated."
}, {
"name" : "severity",
"type" : [ "null", "string" ],
"doc" : "The severity field is the default severity associated to the alarm ",
"default" : null
}]
}
事件:
{
"type" : "record",
"name" : "Incident",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "incident_id",
"type" : "string",
"doc" : "Unique identifier of the incident."
}, {
"name" : "incident_type",
"type" : [ "null", "string" ],
"doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
"default" : null
}, {
"name" : "alarm_source_id",
"type" : "string",
"doc" : "Respective Alarm"
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the incident was generated on the node."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "ID of specific network element."
}]
}
维护:
{
"type" : "record",
"name" : "Maintenance",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "maintenance_id",
"type" : "string",
"doc" : "The message number is the unique ID for every maintenance"
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "The NE ID is the network element ID on which the maintenance is done."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}, {
"name" : "end_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}]
}
我的 Kafka 中的每个 Avro 都有 3 个主题(比如 alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都会使用ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这 3 个主题并获得一条新记录并将其写入一个新主题。问题是我想加入 Alarm 和 Incident基于alarm_id和alarm_source_id并加入基于ne_id的告警和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定密钥吗?
这取决于您要使用哪种联接 (c.f。https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
对于 KStream-KStream 加入,目前(v0.10.2
及更早版本)除了设置新密钥(例如,通过使用 selectKey()
) 并重新分区。
为了KStream-KTable加入,Kafka 0.10.2
(将在接下来的几周内发布)包含一个名为GlobalKTables
的新功能(c.f. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams).这允许您在 KTable 上执行 non-key 连接(即 KStream-GlobalKTable 连接,因此您不需要重新分区 GlobalKTable 中的数据)。
Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.
也计划添加 KTable-GlobalKTable 连接。这可能会在 0.10.3
中可用。不过没有计划添加 "global" KStream-KStream 连接。
您可以通过修改来维护相同的密钥。
您可以使用 KeyValueMapper
来修改您的密钥和值。
您应该按如下方式使用它:
val modifiedStream = kStream.map[String,String](
new KeyValueMapper[String, String,KeyValue[String,String]]{
override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
}
)
您可以将以上逻辑应用于多个 Kstream
对象,以维护用于加入 KStream
的单个键。
我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我想加入的key和我把数据写入每个topic的key不一样
假设我们有这 3 个 Avro 文件:
报警:
{
"type" : "record",
"name" : "Alarm",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "alarm_id",
"type" : "string",
"doc" : "Unique identifier of the alarm."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "Unique identifier of the network element ID that produces the alarm."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the alarm was generated."
}, {
"name" : "severity",
"type" : [ "null", "string" ],
"doc" : "The severity field is the default severity associated to the alarm ",
"default" : null
}]
}
事件:
{
"type" : "record",
"name" : "Incident",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "incident_id",
"type" : "string",
"doc" : "Unique identifier of the incident."
}, {
"name" : "incident_type",
"type" : [ "null", "string" ],
"doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
"default" : null
}, {
"name" : "alarm_source_id",
"type" : "string",
"doc" : "Respective Alarm"
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the incident was generated on the node."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "ID of specific network element."
}]
}
维护:
{
"type" : "record",
"name" : "Maintenance",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "maintenance_id",
"type" : "string",
"doc" : "The message number is the unique ID for every maintenance"
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "The NE ID is the network element ID on which the maintenance is done."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}, {
"name" : "end_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}]
}
我的 Kafka 中的每个 Avro 都有 3 个主题(比如 alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都会使用ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这 3 个主题并获得一条新记录并将其写入一个新主题。问题是我想加入 Alarm 和 Incident基于alarm_id和alarm_source_id并加入基于ne_id的告警和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定密钥吗?
这取决于您要使用哪种联接 (c.f。https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
对于 KStream-KStream 加入,目前(v0.10.2
及更早版本)除了设置新密钥(例如,通过使用 selectKey()
) 并重新分区。
为了KStream-KTable加入,Kafka 0.10.2
(将在接下来的几周内发布)包含一个名为GlobalKTables
的新功能(c.f. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams).这允许您在 KTable 上执行 non-key 连接(即 KStream-GlobalKTable 连接,因此您不需要重新分区 GlobalKTable 中的数据)。
Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.
也计划添加 KTable-GlobalKTable 连接。这可能会在 0.10.3
中可用。不过没有计划添加 "global" KStream-KStream 连接。
您可以通过修改来维护相同的密钥。
您可以使用 KeyValueMapper
来修改您的密钥和值。
您应该按如下方式使用它:
val modifiedStream = kStream.map[String,String](
new KeyValueMapper[String, String,KeyValue[String,String]]{
override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
}
)
您可以将以上逻辑应用于多个 Kstream
对象,以维护用于加入 KStream
的单个键。