Apache Flink - 启用连接排序
Apache Flink - enable join ordering
我注意到 Apache Flink 没有优化表的连接顺序。目前,它保留用户指定的连接顺序(基本上,它按字面意思进行查询)。我想 Apache Calcite 可以优化连接的顺序,但由于某些原因,这些规则没有在 Apache Flink 中使用。
例如,如果我们有两个表 'R' 和 'S'
private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")
我们假设“S”是空的,我们想以两种方式连接这些表:
val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
如果我们想计算 tableOne 和 tableTwo 中的行数,结果在这两种情况下都将为零。
问题是评估 tableOne 将比评估 tableTwo.
花费更长的时间
有没有什么方法可以自动优化连接的执行顺序,甚至可以通过添加一些统计信息来启用可能的计划成本操作?如何添加这些统计数据?
在此 link 的文档中写道,也许有必要更改 Table 环境 CalciteConfig,但我不清楚该怎么做。
请帮忙。
Join 重新排序未启用,因为 Flink 不能很好地处理统计信息。在没有多少准确的基数估计的情况下重新排序连接基本上是赌博。因此,连接重新排序被禁用,表按照用户提供的顺序连接。这给出了确定性和可控的行为。
但是,您可以通过在创建 TableEnvironment
时将 TableConfig
和 CalciteConfig
传递给优化器,即 TableEnvironment.getTableEnvironment(env, yourTableConfig ).在 CalciteConfig
中,您可以将优化规则添加到不同的优化阶段。您可能希望将 JoinCommuteRule
和 JoinAssociateRule
添加到逻辑优化阶段。您可能还必须深入研究代码以检查如何将统计信息传递给优化器。
我注意到 Apache Flink 没有优化表的连接顺序。目前,它保留用户指定的连接顺序(基本上,它按字面意思进行查询)。我想 Apache Calcite 可以优化连接的顺序,但由于某些原因,这些规则没有在 Apache Flink 中使用。
例如,如果我们有两个表 'R' 和 'S'
private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")
我们假设“S”是空的,我们想以两种方式连接这些表:
val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
如果我们想计算 tableOne 和 tableTwo 中的行数,结果在这两种情况下都将为零。 问题是评估 tableOne 将比评估 tableTwo.
花费更长的时间有没有什么方法可以自动优化连接的执行顺序,甚至可以通过添加一些统计信息来启用可能的计划成本操作?如何添加这些统计数据?
在此 link 的文档中写道,也许有必要更改 Table 环境 CalciteConfig,但我不清楚该怎么做。
请帮忙。
Join 重新排序未启用,因为 Flink 不能很好地处理统计信息。在没有多少准确的基数估计的情况下重新排序连接基本上是赌博。因此,连接重新排序被禁用,表按照用户提供的顺序连接。这给出了确定性和可控的行为。
但是,您可以通过在创建 TableEnvironment
时将 TableConfig
和 CalciteConfig
传递给优化器,即 TableEnvironment.getTableEnvironment(env, yourTableConfig ).在 CalciteConfig
中,您可以将优化规则添加到不同的优化阶段。您可能希望将 JoinCommuteRule
和 JoinAssociateRule
添加到逻辑优化阶段。您可能还必须深入研究代码以检查如何将统计信息传递给优化器。