Kafka流如何测试一个滑动window?
Kafka stream how to test a sliding window?
我有一个像这样测试滑动 window 的测试:
.groupByKey
.windowedBy {
TimeWindows.of(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(1)).grace(Duration.ofMillis(0))
}
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(unbounded()))
我需要测试滑动 window 一步后是否产生正确的输出。
我做了这样的事情:
for (i <- 0 to 8) testDriver.pipeInput(record..., T0 to T8)// produce one record every minute (9 records)
for (i <- 0 to 8)testDriver.pipeInput(record..., T8 + 5min) // these comes with a timestamp > 10 minutes tso there are on a second window
第一个循环应该在第一次window内产生一个结果,在滑动 1 分钟后,第二个循环应该产生第二个记录,其中包含第二个 window 的结果(之后滑动)
我该如何检查?我不明白如何使用输出阅读器来检查这两个结果。
只要您希望输出可用,您就可以调用 testDriver.readOutput()
。当然,您需要将结果写入输出主题,例如
...
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("output-topic");
我有一个像这样测试滑动 window 的测试:
.groupByKey
.windowedBy {
TimeWindows.of(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(1)).grace(Duration.ofMillis(0))
}
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(unbounded()))
我需要测试滑动 window 一步后是否产生正确的输出。 我做了这样的事情:
for (i <- 0 to 8) testDriver.pipeInput(record..., T0 to T8)// produce one record every minute (9 records)
for (i <- 0 to 8)testDriver.pipeInput(record..., T8 + 5min) // these comes with a timestamp > 10 minutes tso there are on a second window
第一个循环应该在第一次window内产生一个结果,在滑动 1 分钟后,第二个循环应该产生第二个记录,其中包含第二个 window 的结果(之后滑动) 我该如何检查?我不明白如何使用输出阅读器来检查这两个结果。
只要您希望输出可用,您就可以调用 testDriver.readOutput()
。当然,您需要将结果写入输出主题,例如
...
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("output-topic");