Apache Beam 的 ID Window 会话

ID of Apache Beam Window Session

我正在将 TableRows 收集到 apache beam session windows 中,并希望在确定会话后为每一行添加某种会话 ID。

此代码对会话进行分区:

PCollection<TableRow> rows = ...; // "rows" looks like {{"id":1}, {"id":2}, {"id":3}}
PCollection<TableRow> sessionWindowedRows = rows.apply(
    Window.<TableRow>into(Sessions.withGapDuration(Duration.standardSeconds(300))));

我想要这样的东西:

PCollection<TableRow> rows = ...; // "rows" looks like {{"id":1}, {"id":2}, {"id":3}}
PCollection<TableRow> sessionWindowedRows = rows.apply(
    Window.<TableRow>into(Sessions.withGapDuration(Duration.standardSeconds(300))))
    .apply("adding session id to table row", ParDo.of(new DoFn<TableRow, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext ctx) {
            TableRow row = ctx.element();
            row.put("sessionId", Sessions.getSessionOf(row); // Sessions.getSessionOf(row) IS NOT A REAL FUNCTION
            ctx.output(row);
        }
}));

所以最后 rows 看起来像

{{"id":1, "sessionId":lastSessionId1}, {"id":2, "sessionId":lastSessionId2}, {"id":3, "sessionId":lastSessionId3}}

我找到的最好的是这个关于自定义 windows 的 apache beam docs,但我无法从中理解如何做我想做的事。

在您的 @ProcessElement 函数中,您可以定义 BoundedWindow 类型的参数。从那里你可以得到元素分配的 maxTimestamp() window;从而从中创建一个 sessionId。

@ProcessElement
public void processElement(ProcessContext ctx, BoundedWindow window) {
    TableRow row = ctx.element();
    row.put("sessionId", "session_" + window.maxTimestamp().toString());
    ctx.output(row);
}

我支持 Iñigo 的断言,即您不需要 session window(或任何 window),除非您正在聚合。我想这会发生在您管道的其他地方吗?