与 TestPipeline 一起使用时 SlidingWindows 的奇怪行为
Odd behaviour of SlidingWindows when used with TestPipeline
我有一个简单的测试,演示了与 TestPipeline 一起使用时滑动 window 的奇怪行为。基本上是将一堆字符串馈送到输入,然后它们在滑动 window 中累积,然后应用求和聚合来计算重复项,最后记录聚合函数的输出。在 10 分钟持续时间和 5 分钟周期的滑动 window 中,我预计只有一个 window 用于存储所有元素(因为新元素在第一个元素之后的 5 分钟内开始)...
public class SlidingWindowTest {
private static PipelineOptions options = PipelineOptionsFactory.create();
private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTest.class);
private static class IdentityDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>>
implements DoFn.RequiresWindowAccess{
@Override
public void processElement(ProcessContext processContext) throws Exception {
KV<String, Integer> item = processContext.element();
LOG.info("~~~~~~~~~~> {} => {}", item.getKey(), item.getValue());
LOG.info("~~~~~~~~~~~ {}", processContext.window());
processContext.output(item);
}
}
@Test
public void whatsWrongWithSlidingWindow() {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3"))
.apply(MapElements.via((String item) -> KV.of(item, 1))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
.apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.standardMinutes(10))
.every(Duration.standardMinutes(5))))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new IdentityDoFn()));
p.run();
}
}
但我 8 windows 却被解雇了。 TestPipeline 或我对滑动 windows 应该如何工作的理解有问题吗?
12:19:04.566 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for com.google.cloud.dataflow.sdk.values.KV<java.lang.String, java.lang.Integer>: KvCoder(StringUtf8Coder, VarIntCoder)
12:19:04.566 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
P/S: Dataflow sdk 版本: 1.8.0
预期的行为与您观察到的不同,但也与您的预期不同:
- 首先,你有三个不同的键,所以如果它们都落入一个 window,那么你会期望三个输出。
- 对于以 5 分钟为周期滑动 windows 10 分钟,每个元素必然落入两个 windows。如果一个元素在
1
分钟到达,它既属于 0
到 10
的 window 也属于 -5
到 [=] 的 window 14=]。所以你应该期望 six 输出值,每个键两个。将 windows 视为随着管道运行而更新的东西是一个常见的陷阱,而实际上它们只是输入 data 的简单计算属性,而不是 属性 它的到达时间或管道的执行。
Create
转换将输出时间戳为 BoundedWindow.TIMESTAMP_MIN_VALUE
的所有值,因此它们应该都属于相同的两个 windows.
您的示例似乎表明存在真正的错误。 "a1b2c3"
不应该在它落入的两个不相交的 windows 中,也不可能 "abc"
落在三个 windows 中,其中两个不相交。
不过,顺便说一句,你会受益于检查 DataflowAssert
(called PAssert
现在在 Beam 中)以一致和交叉运行的方式测试 PCollection
的内容。
我有一个简单的测试,演示了与 TestPipeline 一起使用时滑动 window 的奇怪行为。基本上是将一堆字符串馈送到输入,然后它们在滑动 window 中累积,然后应用求和聚合来计算重复项,最后记录聚合函数的输出。在 10 分钟持续时间和 5 分钟周期的滑动 window 中,我预计只有一个 window 用于存储所有元素(因为新元素在第一个元素之后的 5 分钟内开始)...
public class SlidingWindowTest {
private static PipelineOptions options = PipelineOptionsFactory.create();
private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTest.class);
private static class IdentityDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>>
implements DoFn.RequiresWindowAccess{
@Override
public void processElement(ProcessContext processContext) throws Exception {
KV<String, Integer> item = processContext.element();
LOG.info("~~~~~~~~~~> {} => {}", item.getKey(), item.getValue());
LOG.info("~~~~~~~~~~~ {}", processContext.window());
processContext.output(item);
}
}
@Test
public void whatsWrongWithSlidingWindow() {
Pipeline p = TestPipeline.create(options);
p.apply(Create.of("cab", "abc", "a1b2c3", "abc", "a1b2c3"))
.apply(MapElements.via((String item) -> KV.of(item, 1))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
.apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.standardMinutes(10))
.every(Duration.standardMinutes(5))))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new IdentityDoFn()));
p.run();
}
}
但我 8 windows 却被解雇了。 TestPipeline 或我对滑动 windows 应该如何工作的理解有问题吗?
12:19:04.566 [main] DEBUG c.g.c.d.sdk.coders.CoderRegistry - Default coder for com.google.cloud.dataflow.sdk.values.KV<java.lang.String, java.lang.Integer>: KvCoder(StringUtf8Coder, VarIntCoder)
12:19:04.566 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.567 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> a1b2c3 => 2
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:50:00.000Z..-290308-12-21T20:00:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T19:55:00.000Z..-290308-12-21T20:05:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> abc => 2
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~> cab => 1
12:19:04.568 [main] INFO c.q.m.core.SlidingWindowTest - ~~~~~~~~~~~ [-290308-12-21T20:00:00.000Z..-290308-12-21T20:10:00.000Z)
P/S: Dataflow sdk 版本: 1.8.0
预期的行为与您观察到的不同,但也与您的预期不同:
- 首先,你有三个不同的键,所以如果它们都落入一个 window,那么你会期望三个输出。
- 对于以 5 分钟为周期滑动 windows 10 分钟,每个元素必然落入两个 windows。如果一个元素在
1
分钟到达,它既属于0
到10
的 window 也属于-5
到 [=] 的 window 14=]。所以你应该期望 six 输出值,每个键两个。将 windows 视为随着管道运行而更新的东西是一个常见的陷阱,而实际上它们只是输入 data 的简单计算属性,而不是 属性 它的到达时间或管道的执行。 Create
转换将输出时间戳为BoundedWindow.TIMESTAMP_MIN_VALUE
的所有值,因此它们应该都属于相同的两个 windows.
您的示例似乎表明存在真正的错误。 "a1b2c3"
不应该在它落入的两个不相交的 windows 中,也不可能 "abc"
落在三个 windows 中,其中两个不相交。
不过,顺便说一句,你会受益于检查 DataflowAssert
(called PAssert
现在在 Beam 中)以一致和交叉运行的方式测试 PCollection
的内容。