会话 Window 阻止 GroupByKey 工作

Session Window preventing GroupByKey from working

我有一个传入的事件流,每个事件都已经有一个来自另一个进程的关联 sessionId。

我想做的就是使用自定义 CombineFn 将这些事件组合到一个会话对象中。

在开发过程中,我使用了一个从文件中读取的有界数据集,以下代码似乎有效:

input.apply(ParDo.named("ParseEvent").of(new ParseEventFn()))
    .setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(Event.class)))
    .apply(GroupByKey.<String, Event>create())
    .apply(Combine.groupedValues(new SessionAccumulator()))

以上代码(使用 input/output 处理)将输出一系列会话,每个会话都有多个事件。

{sessionId: 1, events: [event1,event2,event3]}
{sessionId: 2, events: [event4,event5]}

但是为了让它在无限数据集上工作,我需要应用一个窗口函数,在本例中是一个 SessionWindow。

input.apply(ParDo.named("ParseEvent").of(new ParseEventFn()))
    .setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(Event.class)))
    .apply(Window.<KV<String, Event>>into(Sessions.withGapDuration(Duration.standardMinutes(30))))
    .apply(GroupByKey.<String, Event>create())
    .apply(Combine.groupedValues(new SessionAccumulator()))

在那种情况下,唯一的新代码是 Windowing 函数,而不是汇总事件,我在它自己的会话中获取每个事件,如下所示:

{sessionId: 1, events: [event1]}
{sessionId: 1, events: [event2]}
{sessionId: 1, events: [event3]}
{sessionId: 2, events: [event4]}
{sessionId: 2, events: [event5]}

知道为什么会这样吗?

编辑:我应该补充一点,ParseEventFn 正在使用 context.outputWithTimestamp() 将时间戳应用于 PCollection,并且该时间戳似乎是正确的。

进一步深入研究,在我的案例中,问题是我认为时间戳是正确的核心假设是错误的。

我在开窗前应用的时间戳是错误的。

Windowing 正在做它应该做的事情,但我把我的时间戳设置得太远了,它为每个事件创建了单独的会话。

糟糕

对于您的情况,您可以编写自己的 WindowFn。如果您将密钥设置为会话 ID,则较大的间隙持续时间也有效,但它并不能很好地反映您的数据和计算的性质。

WindowFn 的成分是:

  • 您自己的 BoundedWindow 子类,在这种情况下,您将创建一个 window 类型,在字段
  • 中包含会话 ID
  • assignWindows,您可以在其中将每个元素分配给由会话 ID 标识的 window。 window 的长度仍然很重要,因为它控制 window 何时过期并被垃圾收集。
  • mergeWindows,您可以在其中合并所有具有相同会话 ID 的 windows。它们不必落入任何特定的间隙持续时间。

您需要注意的另一件事是,管理这些 windows 的垃圾收集的水印由您的无限事件流的来源决定。因此,在 ParDo.of(new ParseEventFn()) 中设置时间戳为时已晚,无法影响水印。您可能丢失了想要保留的数据。