如何管理 Kafka KStream 到 Kstream 的窗口化连接?
How to manage Kafka KStream to Kstream windowed join?
基于apache Kafka docsKStream-to-KStream Joins are always windowed joins
,我的问题是如何控制window的大小?保持主题数据的大小是否相同?或者,例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?
是否有任何很好的示例来显示 windowed KStream-to-kStream windowed 连接?
就我而言,假设我有 2 个 KStream,kstream1
和 kstream2
我希望能够加入 10 天的 kstream1
到 30 天的 kstream2
.
这绝对有可能。当您定义 Stream 运算符时,您可以明确指定连接 window 大小。
KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
stream1.leftJoin(stream2,
... // add ValueJoiner
JoinWindows.of(joinWindowSizeMs)
);
// or if you want to use retention time
stream1.leftJoin(stream2,
... // add ValueJoiner
(JoinWindows)JoinWindows.of(joinWindowSizeMs)
.until(windowRetentionTimeMs)
);
有关详细信息,请参阅 http://docs.confluent.io/current/streams/developer-guide.html#joining-streams。
滑动window 基本上定义了一个额外的连接谓词。在 SQL-like 语法中,这类似于:
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts
AND
stream2.ts <= stream1.ts + after
在此示例中为 before == after == joinWindowSizeMs
。如果您使用 JoinWindows#before()
和 JoinWindows#after()
明确设置这些值,before
和 after
也可以有不同的值。
源主题的保留时间完全独立于应用于 Kafka Streams 本身创建的变更日志主题的指定 windowRetentionTimeMs
。 Window 保留允许将 out-of-order 记录相互连接,即迟到的记录(请记住,Kafka 有一个基于 offset 的排序保证,但是关于 timestamps,记录可以是 out-of-order).
除了 Matthias J. Sax 所说的之外,还有一个 stream-to-stream(窗口化)连接示例位于:
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
这适用于 Confluent 3。1.x 与 Apache Kafka 0.10.1,即截至 2017 年 1 月的最新版本。请参阅上面存储库中的 master
分支以获取使用较新版本的代码示例版本。
这是上面代码示例的关键部分(同样适用于 Kafka 0.10.1),稍微适应了您的问题。请注意,此示例恰好演示了 OUTER JOIN。
long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
(impressionValue, clickValue) -> impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
stringSerde, stringSerde, stringSerde);
// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");
基于apache Kafka docsKStream-to-KStream Joins are always windowed joins
,我的问题是如何控制window的大小?保持主题数据的大小是否相同?或者,例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?
是否有任何很好的示例来显示 windowed KStream-to-kStream windowed 连接?
就我而言,假设我有 2 个 KStream,kstream1
和 kstream2
我希望能够加入 10 天的 kstream1
到 30 天的 kstream2
.
这绝对有可能。当您定义 Stream 运算符时,您可以明确指定连接 window 大小。
KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
stream1.leftJoin(stream2,
... // add ValueJoiner
JoinWindows.of(joinWindowSizeMs)
);
// or if you want to use retention time
stream1.leftJoin(stream2,
... // add ValueJoiner
(JoinWindows)JoinWindows.of(joinWindowSizeMs)
.until(windowRetentionTimeMs)
);
有关详细信息,请参阅 http://docs.confluent.io/current/streams/developer-guide.html#joining-streams。
滑动window 基本上定义了一个额外的连接谓词。在 SQL-like 语法中,这类似于:
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts
AND
stream2.ts <= stream1.ts + after
在此示例中为 before == after == joinWindowSizeMs
。如果您使用 JoinWindows#before()
和 JoinWindows#after()
明确设置这些值,before
和 after
也可以有不同的值。
源主题的保留时间完全独立于应用于 Kafka Streams 本身创建的变更日志主题的指定 windowRetentionTimeMs
。 Window 保留允许将 out-of-order 记录相互连接,即迟到的记录(请记住,Kafka 有一个基于 offset 的排序保证,但是关于 timestamps,记录可以是 out-of-order).
除了 Matthias J. Sax 所说的之外,还有一个 stream-to-stream(窗口化)连接示例位于: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
这适用于 Confluent 3。1.x 与 Apache Kafka 0.10.1,即截至 2017 年 1 月的最新版本。请参阅上面存储库中的 master
分支以获取使用较新版本的代码示例版本。
这是上面代码示例的关键部分(同样适用于 Kafka 0.10.1),稍微适应了您的问题。请注意,此示例恰好演示了 OUTER JOIN。
long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
(impressionValue, clickValue) -> impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
stringSerde, stringSerde, stringSerde);
// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");