Apache Beam 的 ID Window 会话
ID of Apache Beam Window Session
我正在将 TableRows
收集到 apache beam session windows 中,并希望在确定会话后为每一行添加某种会话 ID。
此代码对会话进行分区:
PCollection<TableRow> rows = ...; // "rows" looks like {{"id":1}, {"id":2}, {"id":3}}
PCollection<TableRow> sessionWindowedRows = rows.apply(
Window.<TableRow>into(Sessions.withGapDuration(Duration.standardSeconds(300))));
我想要这样的东西:
PCollection<TableRow> rows = ...; // "rows" looks like {{"id":1}, {"id":2}, {"id":3}}
PCollection<TableRow> sessionWindowedRows = rows.apply(
Window.<TableRow>into(Sessions.withGapDuration(Duration.standardSeconds(300))))
.apply("adding session id to table row", ParDo.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
TableRow row = ctx.element();
row.put("sessionId", Sessions.getSessionOf(row); // Sessions.getSessionOf(row) IS NOT A REAL FUNCTION
ctx.output(row);
}
}));
所以最后 rows
看起来像
{{"id":1, "sessionId":lastSessionId1}, {"id":2, "sessionId":lastSessionId2}, {"id":3, "sessionId":lastSessionId3}}
我找到的最好的是这个关于自定义 windows 的 apache beam docs,但我无法从中理解如何做我想做的事。
在您的 @ProcessElement
函数中,您可以定义 BoundedWindow 类型的参数。从那里你可以得到元素分配的 maxTimestamp()
window;从而从中创建一个 sessionId。
@ProcessElement
public void processElement(ProcessContext ctx, BoundedWindow window) {
TableRow row = ctx.element();
row.put("sessionId", "session_" + window.maxTimestamp().toString());
ctx.output(row);
}
我支持 Iñigo 的断言,即您不需要 session window(或任何 window),除非您正在聚合。我想这会发生在您管道的其他地方吗?
我正在将 TableRows
收集到 apache beam session windows 中,并希望在确定会话后为每一行添加某种会话 ID。
此代码对会话进行分区:
PCollection<TableRow> rows = ...; // "rows" looks like {{"id":1}, {"id":2}, {"id":3}}
PCollection<TableRow> sessionWindowedRows = rows.apply(
Window.<TableRow>into(Sessions.withGapDuration(Duration.standardSeconds(300))));
我想要这样的东西:
PCollection<TableRow> rows = ...; // "rows" looks like {{"id":1}, {"id":2}, {"id":3}}
PCollection<TableRow> sessionWindowedRows = rows.apply(
Window.<TableRow>into(Sessions.withGapDuration(Duration.standardSeconds(300))))
.apply("adding session id to table row", ParDo.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
TableRow row = ctx.element();
row.put("sessionId", Sessions.getSessionOf(row); // Sessions.getSessionOf(row) IS NOT A REAL FUNCTION
ctx.output(row);
}
}));
所以最后 rows
看起来像
{{"id":1, "sessionId":lastSessionId1}, {"id":2, "sessionId":lastSessionId2}, {"id":3, "sessionId":lastSessionId3}}
我找到的最好的是这个关于自定义 windows 的 apache beam docs,但我无法从中理解如何做我想做的事。
在您的 @ProcessElement
函数中,您可以定义 BoundedWindow 类型的参数。从那里你可以得到元素分配的 maxTimestamp()
window;从而从中创建一个 sessionId。
@ProcessElement
public void processElement(ProcessContext ctx, BoundedWindow window) {
TableRow row = ctx.element();
row.put("sessionId", "session_" + window.maxTimestamp().toString());
ctx.output(row);
}
我支持 Iñigo 的断言,即您不需要 session window(或任何 window),除非您正在聚合。我想这会发生在您管道的其他地方吗?