pyspark dataframes:为什么我可以 select 一些嵌套字段而不是其他字段?

pyspark dataframes: Why can I select some nested fields but not others?

我正在尝试使用 Python 3.9.1.

中的 pyspark (3.0.1) 编写一些代码以将 JSON 取消嵌套到 Dataframes 中

我有一些具有如下架构的虚拟数据:

data.printSchema()
root
 |-- recordID: string (nullable = true)
 |-- customerDetails: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- dob: string (nullable = true)
 |-- familyMembers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- relationship: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- contactNumbers: struct (nullable = true)
 |    |    |    |-- work: string (nullable = true)
 |    |    |    |-- home: string (nullable = true)
 |    |    |-- addressDetails: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- addressType: string (nullable = true)
 |    |    |    |    |-- address: string (nullable = true)

当我 select 来自 familyMembers 的字段时,我得到了预期的以下结果:

data.select('familyMembers.contactNumbers.work').show(truncate=False)
+------------------------------------------------+
|work                                            |
+------------------------------------------------+
|[(07) 4612 3880, (03) 5855 2377, (07) 4979 1871]|
|[(07) 4612 3880, (03) 5855 2377]                |
+------------------------------------------------+

data.select('familyMembers.name').show(truncate=False)
+------------------------------------+
|name                                |
+------------------------------------+
|[Jane Smith, Bob Smith, Simon Smith]|
|[Jackie Sacamano, Simon Sacamano]   |
+------------------------------------+

然而,当我尝试从 addressDetails ArrayType(familyMembers 下方)select 字段时,我得到一个错误:

>>> data.select('familyMembers.addressDetails.address').show(truncate=False)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1421, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/usr/local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line 134, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: cannot resolve '`familyMembers`.`addressDetails`['address']' due to data type mismatch: argument 2 requires integral type, however, ''address'' is of string type.;;
'Project [familyMembers#71.addressDetails[address] AS address#277]
+- LogicalRDD [recordID#69, customerDetails#70, familyMembers#71], false

我很困惑。 familyMembersaddressDetails 都是 ArrayTypes,但从一个中选择有效,而另一个无效。对此有解释吗,或者我错过了什么?是因为一个嵌套在另一个里面吗?

重现代码(只有 1 条记录):

from pyspark.sql.types import StructType
from pyspark.sql import SparkSession, DataFrame
import json

rawdata = [{"recordID":"abc-123","customerDetails":{"name":"John Smith","dob":"1980-04-23"},"familyMembers":[{"relationship":"mother","name":"Jane Smith","contactNumbers":{"work":"(07) 4612 3880","home":"(08) 8271 1577"},"addressDetails":[{"addressType":"residential","address":"29 Commonwealth St, Clifton, QLD 4361 "},{"addressType":"work","address":"20 A Yeo Ave, Highgate, SA 5063 "}]},{"relationship":"father","name":"Bob Smith","contactNumbers":{"work":"(03) 5855 2377","home":"(03) 9773 2483"},"addressDetails":[{"addressType":"residential","address":"1735 Fenaughty Rd, Kyabram South, VIC 3620"},{"addressType":"work","address":"12 Haldane St, Bonbeach, VIC 3196 "}]},{"relationship":"brother","name":"Simon Smith","contactNumbers":{"work":"(07) 4979 1871","home":"(08) 9862 6017"},"addressDetails":[{"addressType":"residential","address":"6 Darren St, Sun Valley, QLD 4680"},{"addressType":"work","address":"Arthur River, WA 6315"}]}]},]
strschema = '{"fields":[{"metadata":{},"name":"recordID","nullable":true,"type":"string"},{"metadata":{},"name":"customerDetails","nullable":true,"type":{"fields":[{"metadata":{},"name":"name","nullable":true,"type":"string"},{"metadata":{},"name":"dob","nullable":true,"type":"string"}],"type":"struct"}},{"metadata":{},"name":"familyMembers","nullable":true,"type":{"containsNull":true,"elementType":{"fields":[{"metadata":{},"name":"relationship","nullable":true,"type":"string"},{"metadata":{},"name":"name","nullable":true,"type":"string"},{"metadata":{},"name":"contactNumbers","nullable":true,"type":{"fields":[{"metadata":{},"name":"work","nullable":true,"type":"string"},{"metadata":{},"name":"home","nullable":true,"type":"string"}],"type":"struct"}},{"metadata":{},"name":"addressDetails","nullable":true,"type":{"containsNull":true,"elementType":{"fields":[{"metadata":{},"name":"addressType","nullable":true,"type":"string"},{"metadata":{},"name":"address","nullable":true,"type":"string"}],"type":"struct"},"type":"array"}}],"type":"struct"},"type":"array"}}],"type":"struct"}'

spark = SparkSession.builder.appName("json-un-nester").enableHiveSupport().getOrCreate()
sc = spark.sparkContext

schema = StructType.fromJson(json.loads(strschema))

datardd = sc.parallelize(rawdata)
data = spark.createDataFrame(datardd, schema=schema)

data.show()
data.select('familyMembers.name').show(truncate=False)
data.select('familyMembers.addressDetails.address').show(truncate=False)

要了解这一点,您可以打印 :

的架构
data.select('familyMembers.addressDetails').printSchema()

#root
# |-- familyMembers.addressDetails: array (nullable = true)
# |    |-- element: array (containsNull = true)
# |    |    |-- element: struct (containsNull = true)
# |    |    |    |-- addressType: string (nullable = true)
# |    |    |    |-- address: string (nullable = true)

看到这里你有一个结构数组数组,它​​与你拥有的初始模式不同。所以你不能直接从根访问 address,但是你可以 select 嵌套数组的第一个元素然后访问结构字段 address :

data.selectExpr("familyMembers.addressDetails[0].address").show(truncate=False)

#+--------------------------------------------------------------------------+
#|familyMembers.addressDetails AS addressDetails#29[0].address              |
#+--------------------------------------------------------------------------+
#|[29 Commonwealth St, Clifton, QLD 4361 , 20 A Yeo Ave, Highgate, SA 5063 ]|
#+--------------------------------------------------------------------------+

或者:

data.select(F.col('familyMembers.addressDetails').getItem(0).getItem("address"))

连同@blackbishop 提供的答案您还可以使用 selectexpr 的组合来获得如下输出:

data.select(expr('familyMembers.addressDetails[0].address')))

输出:

如果需要,您也可以使用 explode 获取所有地址,如下所示:

data.select(explode('familyMembers.addressDetails')).select("col.address")

输出: