如何修改Spark中的执行计划?

How to modify the execution plan in Spark?

我正在获取一些 json 格式的执行计划。

val df: DataFrame = ???
val jsonPlan = df.queryExecution.optimizedPlan.toJSON

json计划产出:

可以看到 InMemoryRelation 算子有子键:"child": [..].

是否可以在调用toJSON方法之前将用户信息添加到计划树节点中?使得调用toJSON后,添加的信息连同树节点一起被序列化

例如我想为所有子节点添加标签(label - it a cute child)。 那么结果会是这样的:

"child":[
  {
    "class":"org.apache.spark.sql.execution.WholeStageCodegenExec",
    "num-children":1,
    "child":0,
    "codegenStageId":2
    // My Added Information
    "label": "it a cute child"
  },
  ...
 ]

完整 json 计划:

    [
  {
    "class":"org.apache.spark.sql.execution.columnar.InMemoryRelation",
    "num-children":0,
    "output":[
      [
        {
          "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
          "num-children":0,
          "name":"criminal_name",
          "dataType":"string",
          "nullable":true,
          "metadata":{
          },
          "exprId":{
            "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
            "id":73,
            "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
          },
          "qualifier":[
          ]
        }],
      [
        {
          "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
          "num-children":0,
          "name":"punishment",
          "dataType":"string",
          "nullable":true,
          "metadata":{
          },
          "exprId":{
            "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
            "id":78,
            "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
          },
          "qualifier":[
          ]
        }]],
    "cacheBuilder":null,
    "outputOrdering":[
    ],
    "child":[
      {
        "class":"org.apache.spark.sql.execution.WholeStageCodegenExec",
        "num-children":1,
        "child":0,
        "codegenStageId":2
      },
      {
        "class":"org.apache.spark.sql.execution.ProjectExec",
        "num-children":1,
        "projectList":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.Alias",
              "num-children":1,
              "child":0,
              "name":"criminal_name",
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":73,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ],
              "explicitMetadata":{
              }
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":56,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, staff_dossiers]"
            }],
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.Alias",
              "num-children":1,
              "child":0,
              "name":"punishment",
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":78,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ],
              "explicitMetadata":{
              }
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.ScalaUDF",
              "num-children":1,
              "function":null,
              "dataType":"string",
              "children":[
                0],
              "inputEncoders":null,
              "nullable":true,
              "udfDeterministic":true
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"hobby",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":64,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, indicators]"
            }]],
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",
        "num-children":2,
        "leftKeys":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":56,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, staff_dossiers]"
            }]],
        "rightKeys":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":62,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, indicators]"
            }]],
        "joinType":{
          "object":"org.apache.spark.sql.catalyst.plans.Inner$"
        },
        "buildSide":{
          "object":"org.apache.spark.sql.execution.joins.package$BuildLeft$"
        },
        "left":0,
        "right":1
      },
      {
        "class":"org.apache.spark.sql.execution.InputAdapter",
        "num-children":1,
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.exchange.BroadcastExchangeExec",
        "num-children":1,
        "mode":{
          "product-class":"org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode",
          "key":[
            [
              {
                "class":"org.apache.spark.sql.catalyst.expressions.BoundReference",
                "num-children":0,
                "ordinal":0,
                "dataType":"string",
                "nullable":true
              }]]
        },
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.WholeStageCodegenExec",
        "num-children":1,
        "child":0,
        "codegenStageId":1
      },
      {
        "class":"org.apache.spark.sql.execution.ProjectExec",
        "num-children":1,
        "projectList":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":56,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ]
            }]],
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.FilterExec",
        "num-children":1,
        "condition":[
          {
            "class":"org.apache.spark.sql.catalyst.expressions.And",
            "num-children":2,
            "left":0,
            "right":1
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.IsNotNull",
            "num-children":1,
            "child":0
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
            "num-children":0,
            "name":"name",
            "dataType":"string",
            "nullable":true,
            "metadata":{
            },
            "exprId":{
              "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
              "id":56,
              "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
            },
            "qualifier":"[spark_catalog, murphy, staff_dossiers]"
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.EqualTo",
            "num-children":2,
            "left":0,
            "right":1
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
            "num-children":0,
            "name":"name",
            "dataType":"string",
            "nullable":true,
            "metadata":{
            },
            "exprId":{
              "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
              "id":56,
              "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
            },
            "qualifier":"[spark_catalog, murphy, staff_dossiers]"
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.Literal",
            "num-children":0,
            "value":"Nikita",
            "dataType":"string"
          }],
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.ColumnarToRowExec",
        "num-children":1,
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.InputAdapter",
        "num-children":1,
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.FileSourceScanExec",
        "num-children":0,
        "relation":null,
        "output":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":56,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ]
            }]],
        "requiredSchema":{
          "type":"struct",
          "fields":[
            {
              "name":"name",
              "type":"string",
              "nullable":true,
              "metadata":{
              }
            }]
        },
        "partitionFilters":[
        ],
        "dataFilters":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.IsNotNull",
              "num-children":1,
              "child":0
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":56,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, staff_dossiers]"
            }],
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.EqualTo",
              "num-children":2,
              "left":0,
              "right":1
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":56,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, staff_dossiers]"
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.Literal",
              "num-children":0,
              "value":"Nikita",
              "dataType":"string"
            }]],
        "tableIdentifier":{
          "product-class":"org.apache.spark.sql.catalyst.TableIdentifier",
          "table":"staff_dossiers",
          "database":"murphy"
        }
      },
      {
        "class":"org.apache.spark.sql.execution.ProjectExec",
        "num-children":1,
        "projectList":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":62,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ]
            }],
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"hobby",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":64,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ]
            }]],
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.FilterExec",
        "num-children":1,
        "condition":[
          {
            "class":"org.apache.spark.sql.catalyst.expressions.And",
            "num-children":2,
            "left":0,
            "right":1
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.EqualTo",
            "num-children":2,
            "left":0,
            "right":1
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
            "num-children":0,
            "name":"name",
            "dataType":"string",
            "nullable":true,
            "metadata":{
            },
            "exprId":{
              "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
              "id":62,
              "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
            },
            "qualifier":"[spark_catalog, murphy, indicators]"
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.Literal",
            "num-children":0,
            "value":"Nikita",
            "dataType":"string"
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.IsNotNull",
            "num-children":1,
            "child":0
          },
          {
            "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
            "num-children":0,
            "name":"name",
            "dataType":"string",
            "nullable":true,
            "metadata":{
            },
            "exprId":{
              "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
              "id":62,
              "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
            },
            "qualifier":"[spark_catalog, murphy, indicators]"
          }],
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.ColumnarToRowExec",
        "num-children":1,
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.InputAdapter",
        "num-children":1,
        "child":0
      },
      {
        "class":"org.apache.spark.sql.execution.FileSourceScanExec",
        "num-children":0,
        "relation":null,
        "output":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":62,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ]
            }],
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"hobby",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":64,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":[
              ]
            }]],
        "requiredSchema":{
          "type":"struct",
          "fields":[
            {
              "name":"name",
              "type":"string",
              "nullable":true,
              "metadata":{
              }
            },
            {
              "name":"hobby",
              "type":"string",
              "nullable":true,
              "metadata":{
              }
            }]
        },
        "partitionFilters":[
        ],
        "dataFilters":[
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.EqualTo",
              "num-children":2,
              "left":0,
              "right":1
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":62,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, indicators]"
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.Literal",
              "num-children":0,
              "value":"Nikita",
              "dataType":"string"
            }],
          [
            {
              "class":"org.apache.spark.sql.catalyst.expressions.IsNotNull",
              "num-children":1,
              "child":0
            },
            {
              "class":"org.apache.spark.sql.catalyst.expressions.AttributeReference",
              "num-children":0,
              "name":"name",
              "dataType":"string",
              "nullable":true,
              "metadata":{
              },
              "exprId":{
                "product-class":"org.apache.spark.sql.catalyst.expressions.ExprId",
                "id":62,
                "jvmId":"2996f433-9e44-40c1-9aaf-d74c0768f68b"
              },
              "qualifier":"[spark_catalog, murphy, indicators]"
            }]],
        "tableIdentifier":{
          "product-class":"org.apache.spark.sql.catalyst.TableIdentifier",
          "table":"indicators",
          "database":"murphy"
        }
      }
    ]
  }
]

当然,你可以解析和修改内存中的任何JSON对象,但这与Spark无关。相关:What JSON library to use in Scala?

您所做的任何修改都不会保留在执行计划本身中。