Apache Kafka 根据消息的值对窗口消息进行排序
Apache Kafka order windowed messages based on their value
我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。
我有 Kafka 发布者发送以下格式的字符串消息:
{system_timestamp}-{event_name}?{parameters}
例如:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
此外,我们为每条消息添加一些消息密钥,以将它们发送到相应的分区。
我想做的是根据消息的 {system-timestamp} 部分并在 1 分钟内 window 重新排序事件,因为我们的发布者没有'保证消息将按照 {system-timestamp} 值发送。
例如,我们可以先向主题传递一条具有更大 {system-timestamp} 值的消息。
我调查了 Kafka Stream API 并找到了一些关于消息 windowing 和聚合的示例:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.
/* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
() -> "", // initial value
(aggKey, value, aggregate) -> aggregate + "", // aggregating value
TimeWindows.of(1000), // intervals in milliseconds
Serdes.String(), // serde for aggregated value
"test-store"
);*/
但是接下来我应该如何处理这个分组流?我没有看到任何可用的“sort() (e1,e2) -> e1.compareTo(e2)”方法,也可以将 windows 应用于方法像 aggregation(), reduce() ,count() , 但我认为我不不需要任何消息数据操作。
如何在 1 分钟内重新排序消息 window 并将它们发送到另一个主题?
这是一个大纲:
创建处理器实现:
在 process() 方法中,对于每条消息:
- 从消息值中读取时间戳
- 使用 (timestamp, message-key) 对作为键,使用消息值作为值插入到 KeyValueStore 中。注意,这也提供重复数据删除。您需要提供一个自定义 Serde 来序列化密钥,以便时间戳排在第一位,以字节为单位,以便范围查询首先按时间戳排序。
在 punctuate() 方法中:
- 使用从 0 到时间戳的范围提取读取存储 - 60'000(=1 分钟)
- 使用 context.forward() 按顺序发送获取的消息并从存储中删除它们
这种方法的问题在于,如果没有新消息到达以推进 "stream time",则不会触发 punctuate()。如果这对您的情况有风险,您可以创建一个外部调度程序,定期向主题的每个(!)分区发送 "tick" 消息,您的处理器应该忽略这些消息,但它们会导致标点触发缺少 "real" 条消息。
KIP-138 将通过添加对系统时间标点符号的明确支持来解决此限制:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics
这是我在我的项目中订购流的方式。
- 已创建包含源、处理器、接收器的拓扑。
- 在处理器中
- process(key, value) -> 将每条记录添加到 List(实例变量)。
- init() -> schedule(WINDOW_BUFFER_TIME, WALL_CLOCK_TIME) -> 标点(时间戳)排序List(实例变量)中window缓冲时间的项目列表并迭代和前进。清除列表(实例变量)。
这个逻辑适合我。
我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。
我有 Kafka 发布者发送以下格式的字符串消息:
{system_timestamp}-{event_name}?{parameters}
例如:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
此外,我们为每条消息添加一些消息密钥,以将它们发送到相应的分区。
我想做的是根据消息的 {system-timestamp} 部分并在 1 分钟内 window 重新排序事件,因为我们的发布者没有'保证消息将按照 {system-timestamp} 值发送。
例如,我们可以先向主题传递一条具有更大 {system-timestamp} 值的消息。
我调查了 Kafka Stream API 并找到了一些关于消息 windowing 和聚合的示例:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.
/* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
() -> "", // initial value
(aggKey, value, aggregate) -> aggregate + "", // aggregating value
TimeWindows.of(1000), // intervals in milliseconds
Serdes.String(), // serde for aggregated value
"test-store"
);*/
但是接下来我应该如何处理这个分组流?我没有看到任何可用的“sort() (e1,e2) -> e1.compareTo(e2)”方法,也可以将 windows 应用于方法像 aggregation(), reduce() ,count() , 但我认为我不不需要任何消息数据操作。
如何在 1 分钟内重新排序消息 window 并将它们发送到另一个主题?
这是一个大纲:
创建处理器实现:
在 process() 方法中,对于每条消息:
- 从消息值中读取时间戳
- 使用 (timestamp, message-key) 对作为键,使用消息值作为值插入到 KeyValueStore 中。注意,这也提供重复数据删除。您需要提供一个自定义 Serde 来序列化密钥,以便时间戳排在第一位,以字节为单位,以便范围查询首先按时间戳排序。
在 punctuate() 方法中:
- 使用从 0 到时间戳的范围提取读取存储 - 60'000(=1 分钟)
- 使用 context.forward() 按顺序发送获取的消息并从存储中删除它们
这种方法的问题在于,如果没有新消息到达以推进 "stream time",则不会触发 punctuate()。如果这对您的情况有风险,您可以创建一个外部调度程序,定期向主题的每个(!)分区发送 "tick" 消息,您的处理器应该忽略这些消息,但它们会导致标点触发缺少 "real" 条消息。 KIP-138 将通过添加对系统时间标点符号的明确支持来解决此限制: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics
这是我在我的项目中订购流的方式。
- 已创建包含源、处理器、接收器的拓扑。
- 在处理器中
- process(key, value) -> 将每条记录添加到 List(实例变量)。
- init() -> schedule(WINDOW_BUFFER_TIME, WALL_CLOCK_TIME) -> 标点(时间戳)排序List(实例变量)中window缓冲时间的项目列表并迭代和前进。清除列表(实例变量)。
这个逻辑适合我。