Kafka 流和 windowing 以在一段时间内保持计数 window
Kafka streams and windowing to keep count over a time window
我是 Whosebug 的新手,如果问题问得不好,请原谅我。非常感谢任何 help/inspiration!
我正在使用 Kafka 流过滤传入我的数据库的数据。传入的消息看起来像 {"ID":"X","time":"HH:MM"}
和一些其他参数,在这种情况下无关紧要。我设法获得了一个 java 应用程序 运行,它从一个主题中读取并打印出传入的消息。现在我想做的是使用 KTables(?) 对具有相同 ID 的传入消息进行分组,然后使用会话 window 对时隙中的 table 进行分组。我想要在时间轴上连续 运行 X 分钟的时间 window。
第一件事当然是得到一个KTable 运行 统计具有相同ID 的传入消息。我想做的应该是这样的:
ID Count
X 1
Y 3
Z 1
不断更新,因此从 table.
中删除具有过时时间戳的消息
我不是百分百确定,但我想我想要的是 KTables 而不是 KStreams,对吗?如果这是实现我想要的结果的正确方法,我该如何实现滑动 Window?
这是我现在使用的代码。它只读取一个主题并打印传入的消息。
private static List<String> printEvent(String o) {
System.out.println(o);
return Arrays.asList(o);
}
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(srcTopic)
.flatMapValues(value -> printEvent(value));
我想知道我必须添加什么才能实现上述所需的输出,以及我将其放在我的代码中的什么位置。
在此先感谢您的帮助!
是的,您需要 Ktable 和滑动 window,我还建议您查看 watermark feature,以处理延迟传递消息。
Example
KTable<Windowed<Key>, Value> oneMinuteWindowed = yourKStream
.groupByKey()
.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
//where your adder can be as simple as (val, agg) -> agg + val
//for primitive types or as complex as you need
我是 Whosebug 的新手,如果问题问得不好,请原谅我。非常感谢任何 help/inspiration!
我正在使用 Kafka 流过滤传入我的数据库的数据。传入的消息看起来像 {"ID":"X","time":"HH:MM"}
和一些其他参数,在这种情况下无关紧要。我设法获得了一个 java 应用程序 运行,它从一个主题中读取并打印出传入的消息。现在我想做的是使用 KTables(?) 对具有相同 ID 的传入消息进行分组,然后使用会话 window 对时隙中的 table 进行分组。我想要在时间轴上连续 运行 X 分钟的时间 window。
第一件事当然是得到一个KTable 运行 统计具有相同ID 的传入消息。我想做的应该是这样的:
ID Count
X 1
Y 3
Z 1
不断更新,因此从 table.
中删除具有过时时间戳的消息我不是百分百确定,但我想我想要的是 KTables 而不是 KStreams,对吗?如果这是实现我想要的结果的正确方法,我该如何实现滑动 Window?
这是我现在使用的代码。它只读取一个主题并打印传入的消息。
private static List<String> printEvent(String o) {
System.out.println(o);
return Arrays.asList(o);
}
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(srcTopic)
.flatMapValues(value -> printEvent(value));
我想知道我必须添加什么才能实现上述所需的输出,以及我将其放在我的代码中的什么位置。
在此先感谢您的帮助!
是的,您需要 Ktable 和滑动 window,我还建议您查看 watermark feature,以处理延迟传递消息。 Example
KTable<Windowed<Key>, Value> oneMinuteWindowed = yourKStream
.groupByKey()
.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
//where your adder can be as simple as (val, agg) -> agg + val
//for primitive types or as complex as you need