如何在不进行数据扫描的情况下覆盖 pyspark DataFrame 架构?

How to overwrite pyspark DataFrame schema without data scan?

这个问题与有关。假设我有一个具有特定模式的 pyspark DataFrame,我想用我 知道 兼容的新模式覆盖该模式,我可以这样做:

df: DataFrame
new_schema = ...

df.rdd.toDF(schema=new_schema)

不幸的是,这会触发上述 link 中所述的计算。有没有一种方法可以在元数据级别(或惰性)执行此操作,而无需急切地触发计算或转换?

编辑、备注:

我自己对此进行了深入研究,我很好奇您对我的 workaround/POC 的看法。参见 https://github.com/ravwojdyla/spark-schema-utils。它转换表达式并更新属性。

假设我有两个模式,第一个没有任何元数据,让我们调用 schema_wo_metadata:

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {},
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {},
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

第二个在内部 (ia) 字段和外部 (ob) 字段上有额外的元数据,我们称之为 schema_wi_metadata

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {
                "description": "this is ia desc"
              },
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {
        "description": "this is ob desc"
      },
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

现在假设我有一个具有 schema_wo_metadata 架构的数据集,并且想要将架构与 schema_wi_metadata:

交换
from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType


# I assume these get generate/specified somewhere
schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...

# You need my extra package
spark = SparkSession.builder \
    .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \
    .getOrCreate()

# Dummy data with `schema_wo_metadata` schema:
df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
                                 Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
                           schema=schema_wo_metadata)

_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)

现在 new_dfschema_wi_metadata,例如:

new_df.schema["oa"].dataType.elementType["ia"].metadata
# -> {'description': 'this is ia desc'}

有什么意见吗?