Kafka Stream 根据 json 消息中的时间戳键对消息进行排序
Kafka Stream to sort messages based on timestamp key in json message
我正在使用 JSON 消息发布 Kafka,例如:
"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12
我想使用 Kafka Streams 在 10 秒内 window 根据 UpdateTime
对消息进行排序,并在另一个 Kafka 主题中推回排序后的消息。
我创建了一个流,它从输入主题中读取数据,然后我在 groupByKey()
之后创建 TimeWindowedKStream
,其中 UserID 是消息中的键(尽管 groupByKey
和然后排序,但我不能直接得到 WindowedBy
)。但是我无法根据 UpdateTime
在 10 秒 window 内对消息进行排序。我的源代码是:
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("UnsortedMessages");
TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
.until(10000L));
/*
SORTING CODE
*/
outputMessage.toStream().to("SortedMessages", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-sorting-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
非常感谢。
如果您想忽略键对消息进行排序,只有基于分区进行排序才有意义,并且只有当输入主题与输出主题具有相同数量的分区时才有意义。对于这种情况,您应该提取分区号并将其用作消息密钥 (cf: https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information)
对于排序,比较棘手。请注意,Kafka Streams 遵循 "continuous output" 模型并使用 DSL 为每个输入记录发出更新。因此,最好使用处理器 API。您将使用带有附加商店的 Processor
并将记录放入商店。作为一个内存结构,你保存了一个排序的记录列表。随着时间的推移,您可以发出 "finished" windows 并从存储中删除相应的记录。
我认为您无法使用 DSL 构建它。
我正在使用 JSON 消息发布 Kafka,例如:
"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12
我想使用 Kafka Streams 在 10 秒内 window 根据 UpdateTime
对消息进行排序,并在另一个 Kafka 主题中推回排序后的消息。
我创建了一个流,它从输入主题中读取数据,然后我在 groupByKey()
之后创建 TimeWindowedKStream
,其中 UserID 是消息中的键(尽管 groupByKey
和然后排序,但我不能直接得到 WindowedBy
)。但是我无法根据 UpdateTime
在 10 秒 window 内对消息进行排序。我的源代码是:
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("UnsortedMessages");
TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
.until(10000L));
/*
SORTING CODE
*/
outputMessage.toStream().to("SortedMessages", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-sorting-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
非常感谢。
如果您想忽略键对消息进行排序,只有基于分区进行排序才有意义,并且只有当输入主题与输出主题具有相同数量的分区时才有意义。对于这种情况,您应该提取分区号并将其用作消息密钥 (cf: https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information)
对于排序,比较棘手。请注意,Kafka Streams 遵循 "continuous output" 模型并使用 DSL 为每个输入记录发出更新。因此,最好使用处理器 API。您将使用带有附加商店的 Processor
并将记录放入商店。作为一个内存结构,你保存了一个排序的记录列表。随着时间的推移,您可以发出 "finished" windows 并从存储中删除相应的记录。
我认为您无法使用 DSL 构建它。