"Rowtime attributes must not be in the input rows of a regular join" 尽管使用间隔连接,但仅使用事件时间戳

"Rowtime attributes must not be in the input rows of a regular join" despite using interval join, but only with event timestamp

示例代码:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment


env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql(
    """
    CREATE TABLE table1 (
        id INT,
        ts TIMESTAMP(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data1.csv'
    )
"""
)


table_env.execute_sql(
    """
    CREATE TABLE table2 (
        id2 INT,
        ts2 TIMESTAMP(3),
        WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data2.csv'
    )
"""
)

table1 = table_env.from_path("table1")
table2 = table_env.from_path("table2")

print(table1.join(table2).where("ts = ts2 && id = id2").select("id, ts").to_pandas())

报错:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalLegacySink(name=[collect], fields=[id, ts])
+- FlinkLogicalCalc(select=[id, ts])
   +- FlinkLogicalJoin(condition=[AND(=(, ), =([=12=], ))], joinType=[inner])
      :- FlinkLogicalCalc(select=[id, ts, CAST(ts) AS ts0])
      :  +- FlinkLogicalWatermarkAssigner(rowtime=[ts], watermark=[-(, 5000:INTERVAL SECOND)])
      :     +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table1, source: [CsvTableSource(read fields: id, ts)]]], fields=[id, ts])
      +- FlinkLogicalCalc(select=[id2, ts2, CAST(ts2) AS ts20])
         +- FlinkLogicalWatermarkAssigner(rowtime=[ts2], watermark=[-(, 5000:INTERVAL SECOND)])
            +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table2, source: [CsvTableSource(read fields: id2, ts2)]]], fields=[id2, ts2])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

这似乎与 this one because I have followed the instructions in the docs 等其他类似问题不同,它指定了等值连接和时间间隔连接 (ts = ts2 && id = id2):

An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

For example, the following predicates are valid interval join conditions:

  • ltime = rtime

如果问题是这些不是仅附加表,我不知道如何使它们如此。

设置时间特性没有帮助:

StreamExecutionEnvironment.get_execution_environment().set_stream_time_characteristic(
    TimeCharacteristic.EventTime
)

如果我使用处理时间而不是 ts AS PROCTIME(),则查询成功。但我想我需要使用事件时间,我不明白为什么会有这种差异。

SQL 中两个常规表之间的联接始终使用 FROM a, ba JOIN b 以相同的方式表示。

但是,Flink 在底层为相同的语法提供了两种类型的连接运算符。一种是 interval join,它需要时间属性根据时间将两个表相互关联。一个是 常规 SQL 联接 ,它以您从数据库中了解到的通用方式实现。

间隔连接只是一种流式优化,可在运行时保持较低的状态大小,并且不会在结果中产生更新。常规 SQL 连接运算符最终可以产生与 an 间隔相同的结果,但维护成本更高。

为了区分间隔连接和常规连接,优化器在 WHERE 子句中搜索对时间属性起作用的谓词。对于间隔连接,输出始终可以包含两个行时间属性,用于外部时态操作(下游时态运算符)。因为这两个行时间属性仍然与底层水印系统保持一致。这意味着例如外部 window 或其他间隔连接可以再次使用时间属性。

但是,区间连接的实现有一些已知的缺点,FLINK-10211 中也有介绍。由于糟糕的设计,我们无法在某些位置区分间隔连接和常规连接。因此,我们需要假设常规连接可以是间隔连接,并且不能自动将用户的时间属性转换为 TIMESTAMP。相反,我们目前禁止在常规连接的输出中使用时间属性。

在某个时候,这个限制有望消失,直到那时用户有两种可能性:

  1. 不要对包含时间属性的表使用常规连接。您也可以使用嵌套的 SELECT 子句将其投射出去,或者在加入之前执行 CAST

  2. 使用 SELECT 子句中的 CAST(col AS TIMESTAMP) 将时间属性转换为常规时间戳。它将被下推到连接操作中。

您的异常表明您使用的是常规联接。间隔连接需要一个范围来操作(即使它只有 1 毫秒)。他们不支持平等。