执行 Apache BEAM sql 查询时出错 - 在 GroupByKey 之前使用 Window.into 或 Window.triggering 转换

Error executing Apache BEAM sql query - Use a Window.into or Window.triggering transform prior to GroupByKey

如何在 BEAM SQL 中的 GroupByKey 之前包含 Window.into 或 Window.triggering 转换?

我有以下 2 tables:

是table

CREATE TABLE table1(
field1 varchar 
,field2 varchar
)

第 2 Table

CREATE TABLE table2(
field1 varchar
,field3 varchar
)

我正在第三次写结果Table

CREATE TABLE table3(
field1 varchar
,field3 varchar
)

前 2 个 table 正在从 kafka 流中读取数据,我正在对这些 table 进行连接并将数据插入第三个 table,使用以下命令询问。前 2 table 是 un-bounded/non-bounded

INSERT INTO table3 
        (field1, 
         field3) 
SELECT a.field1, 
   b.field3 
FROM   table1 a 
   JOIN table2 b 
     ON a.field1 = b.field1 

我收到以下错误:

Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173) at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204) at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:126) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472) at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:107) at org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(Join.java:59) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.standardJoin(BeamJoinRel.java:217) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.buildBeamPipeline(BeamJoinRel.java:161) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel.buildBeamPipeline(BeamProjectRel.java:68) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel.buildBeamPipeline(BeamAggregationRel.java:80) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel.buildBeamPipeline(BeamIOSinkRel.java:64) at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.compileBeamPipeline(BeamQueryPlanner.java:127) at com.dss.tss.v2.client.BeamSqlCli.compilePipeline(BeamSqlCli.java:95) at com.dss.test.v2.client.SQLCli.main(SQLCli.java:100)

这是 Beam 当前的实施限制 SQL。你需要 define windows and then join the inputs per-window.

几个如何在 Beam 中进行连接和 windowing 的示例 SQL:

  • complex SQL queryHOP window 并加入;
  • test 在 SQL 之外的 Java 中定义了一个 window,然后使用 join 应用查询;
  • 可以找到其他 window 函数的示例 here;

背景

这个问题是由于一般的无界数据流的Join操作很难定义,不限于Beam SQL.

想象一下,例如,当数据处理系统从 2 个源接收输入,然后必须匹配它们之间的记录时。从高层次的角度来看,这样的系统必须保留它到目前为止看到的所有数据,然后对于每条新记录,它必须遍历第二个输入源中的所有记录,看看那里是否有匹配项。当您的数据源有限且较小时,它可以正常工作。在简单的情况下,您可以将所有内容加载到内存中,匹配来自源的数据,生成输出。

对于流式数据,您不能永远缓存它。如果数据永远不会停止怎么办?并且不清楚您何时要发出数据。如果你有一个 outer join 操作,你什么时候决定你没有来自另一个输入的匹配记录?

例如,请参阅 explanation for the unbounded PCollections in GroupByKey section of the Beam guide. And Joins in Beam are usually implemented on top of it using CoGroupByKey(梁 SQL 也加入)。

所有这些问题都可能针对特定管道得到回答,但在一般情况下很难解决。 Beam SDK 和 Beam SQL 中的当前方法是将其委托给用户来解决具体的业务案例。 Beam 允许用户决定将哪些数据聚合到 window, how long to wait for late data, and when to emit the results. There are also things like state cells and timers 中以进行更精细的控制。这允许编写管道的程序员明确定义行为并在一定程度上解决这些问题,但具有(很多)额外的复杂性。

Beam SQL 是在常规 Beam SDK 概念之上实现的,并受到相同限制的约束。但它有更多自己的实现。例如,您没有 SQL 语法来定义触发器、状态或自定义 windows。或者您不能编写可以在外部服务中保持状态的自定义 ParDo