如何在不进行数据扫描的情况下覆盖 pyspark DataFrame 架构?
How to overwrite pyspark DataFrame schema without data scan?
这个问题与有关。假设我有一个具有特定模式的 pyspark DataFrame,我想用我 知道 兼容的新模式覆盖该模式,我可以这样做:
df: DataFrame
new_schema = ...
df.rdd.toDF(schema=new_schema)
不幸的是,这会触发上述 link 中所述的计算。有没有一种方法可以在元数据级别(或惰性)执行此操作,而无需急切地触发计算或转换?
编辑、备注:
- 架构可以任意复杂(嵌套等)
- 新模式包括对描述、可空性和附加元数据的更新(类型更新的奖励点)
- 我想避免编写自定义查询表达式生成器,除非Spark 中已经内置了一个可以根据模式生成查询的生成器/
StructType
我自己对此进行了深入研究,我很好奇您对我的 workaround/POC 的看法。参见 https://github.com/ravwojdyla/spark-schema-utils。它转换表达式并更新属性。
假设我有两个模式,第一个没有任何元数据,让我们调用 schema_wo_metadata
:
{
"fields": [
{
"metadata": {},
"name": "oa",
"nullable": false,
"type": {
"containsNull": true,
"elementType": {
"fields": [
{
"metadata": {},
"name": "ia",
"nullable": false,
"type": "long"
},
{
"metadata": {},
"name": "ib",
"nullable": false,
"type": "string"
}
],
"type": "struct"
},
"type": "array"
}
},
{
"metadata": {},
"name": "ob",
"nullable": false,
"type": "double"
}
],
"type": "struct"
}
第二个在内部 (ia
) 字段和外部 (ob
) 字段上有额外的元数据,我们称之为 schema_wi_metadata
{
"fields": [
{
"metadata": {},
"name": "oa",
"nullable": false,
"type": {
"containsNull": true,
"elementType": {
"fields": [
{
"metadata": {
"description": "this is ia desc"
},
"name": "ia",
"nullable": false,
"type": "long"
},
{
"metadata": {},
"name": "ib",
"nullable": false,
"type": "string"
}
],
"type": "struct"
},
"type": "array"
}
},
{
"metadata": {
"description": "this is ob desc"
},
"name": "ob",
"nullable": false,
"type": "double"
}
],
"type": "struct"
}
现在假设我有一个具有 schema_wo_metadata
架构的数据集,并且想要将架构与 schema_wi_metadata
:
交换
from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType
# I assume these get generate/specified somewhere
schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...
# You need my extra package
spark = SparkSession.builder \
.config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \
.getOrCreate()
# Dummy data with `schema_wo_metadata` schema:
df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
schema=schema_wo_metadata)
_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)
现在 new_df
有 schema_wi_metadata
,例如:
new_df.schema["oa"].dataType.elementType["ia"].metadata
# -> {'description': 'this is ia desc'}
有什么意见吗?
这个问题与
df: DataFrame
new_schema = ...
df.rdd.toDF(schema=new_schema)
不幸的是,这会触发上述 link 中所述的计算。有没有一种方法可以在元数据级别(或惰性)执行此操作,而无需急切地触发计算或转换?
编辑、备注:
- 架构可以任意复杂(嵌套等)
- 新模式包括对描述、可空性和附加元数据的更新(类型更新的奖励点)
- 我想避免编写自定义查询表达式生成器,除非Spark 中已经内置了一个可以根据模式生成查询的生成器/
StructType
我自己对此进行了深入研究,我很好奇您对我的 workaround/POC 的看法。参见 https://github.com/ravwojdyla/spark-schema-utils。它转换表达式并更新属性。
假设我有两个模式,第一个没有任何元数据,让我们调用 schema_wo_metadata
:
{
"fields": [
{
"metadata": {},
"name": "oa",
"nullable": false,
"type": {
"containsNull": true,
"elementType": {
"fields": [
{
"metadata": {},
"name": "ia",
"nullable": false,
"type": "long"
},
{
"metadata": {},
"name": "ib",
"nullable": false,
"type": "string"
}
],
"type": "struct"
},
"type": "array"
}
},
{
"metadata": {},
"name": "ob",
"nullable": false,
"type": "double"
}
],
"type": "struct"
}
第二个在内部 (ia
) 字段和外部 (ob
) 字段上有额外的元数据,我们称之为 schema_wi_metadata
{
"fields": [
{
"metadata": {},
"name": "oa",
"nullable": false,
"type": {
"containsNull": true,
"elementType": {
"fields": [
{
"metadata": {
"description": "this is ia desc"
},
"name": "ia",
"nullable": false,
"type": "long"
},
{
"metadata": {},
"name": "ib",
"nullable": false,
"type": "string"
}
],
"type": "struct"
},
"type": "array"
}
},
{
"metadata": {
"description": "this is ob desc"
},
"name": "ob",
"nullable": false,
"type": "double"
}
],
"type": "struct"
}
现在假设我有一个具有 schema_wo_metadata
架构的数据集,并且想要将架构与 schema_wi_metadata
:
from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType
# I assume these get generate/specified somewhere
schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...
# You need my extra package
spark = SparkSession.builder \
.config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \
.getOrCreate()
# Dummy data with `schema_wo_metadata` schema:
df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
schema=schema_wo_metadata)
_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)
现在 new_df
有 schema_wi_metadata
,例如:
new_df.schema["oa"].dataType.elementType["ia"].metadata
# -> {'description': 'this is ia desc'}
有什么意见吗?