使用 Apache Beam 开窗 - 已修复 Windows 似乎没有关闭?
Windowing with Apache Beam - Fixed Windows Don't Seem to be Closing?
我们正在尝试在 Apache Beam 管道上使用固定 windows(使用 DirectRunner
)。我们的流程如下:
- 从pub/sub
中提取数据
- 将JSON反序列化为Java对象
- Window 事件固定 windows 5 秒
- 使用自定义
CombineFn
,将 Event
中的每个 window 组合成一个 List<Event>
- 为了测试,简单输出结果
List<Event>
流水线代码:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
我们看到的结果是最后一步永远不会 运行,因为我们没有得到任何日志记录。
此外,我们在自定义 CombineFn
class 的每个方法中添加了 System.out.println("***")
,以便跟踪它们何时 运行,但似乎它们并没有运行 也没有。
这里windowing设置有误吗?我们遵循了 https://beam.apache.org/documentation/programming-guide/#windowing 中的一个示例,它看起来相当简单,但显然缺少一些基本的东西。
感谢任何见解 - 提前致谢!
看起来主要问题确实是缺少触发器 - window 正在打开,但没有告诉它何时发出结果。我们想简单地 window 基于处理时间(而不是事件时间),所以做了以下事情:
.apply("Window", Window
.<Event>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
本质上,这会创建一个全局 window,它会在处理第一个元素 5 秒后触发以发出事件。每次 window 关闭时,另一个在接收到元素后打开。当我们没有 withAllowedLateness
片段时,Beam 抱怨 - 据我所知,这只是告诉它忽略任何迟到的数据。
我的理解可能有点离题,但是上面的代码片段已经解决了我们的问题!
我们正在尝试在 Apache Beam 管道上使用固定 windows(使用 DirectRunner
)。我们的流程如下:
- 从pub/sub 中提取数据
- 将JSON反序列化为Java对象
- Window 事件固定 windows 5 秒
- 使用自定义
CombineFn
,将Event
中的每个 window 组合成一个List<Event>
- 为了测试,简单输出结果
List<Event>
流水线代码:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
我们看到的结果是最后一步永远不会 运行,因为我们没有得到任何日志记录。
此外,我们在自定义 CombineFn
class 的每个方法中添加了 System.out.println("***")
,以便跟踪它们何时 运行,但似乎它们并没有运行 也没有。
这里windowing设置有误吗?我们遵循了 https://beam.apache.org/documentation/programming-guide/#windowing 中的一个示例,它看起来相当简单,但显然缺少一些基本的东西。
感谢任何见解 - 提前致谢!
看起来主要问题确实是缺少触发器 - window 正在打开,但没有告诉它何时发出结果。我们想简单地 window 基于处理时间(而不是事件时间),所以做了以下事情:
.apply("Window", Window
.<Event>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
本质上,这会创建一个全局 window,它会在处理第一个元素 5 秒后触发以发出事件。每次 window 关闭时,另一个在接收到元素后打开。当我们没有 withAllowedLateness
片段时,Beam 抱怨 - 据我所知,这只是告诉它忽略任何迟到的数据。
我的理解可能有点离题,但是上面的代码片段已经解决了我们的问题!