Pyspark:在分解数组后选择一个值

Pyspark: Selecting a value after exploding an array

我是 pyspark 的新手,正在尝试解析 telecom.value if telecom.system = "fax|phone" 但遇到以下错误。我知道 filter() 会 return 我是一个结构,我正在 select 从中获取一个列。我如何 select 调用 filter() 后的列值?

raise_from 中的文件“”,第 3 行 pyspark.sql.utils.AnalysisException:运营商 !Project [name#3.family AS Practitioner_LastName# 中的 name#3、telecom#5、address#7 中缺少已解决的属性 telecom#27、telecom#33 23,姓名#3.suffix AS Practitioner_NameSuffix#24,姓名#3.given[0] AS Practitioner_FirstName#25,电信#27.value AS telecom.value #42,电信#33.value AS telecom.value#43,地址#7.city AS PractitionerCity#38,地址#7.line[0] AS PractitionerAddress_1#39,地址#7.postalCode AS PractitionerZip#40,地址#7.state AS PractitionerState#41]。具有相同名称的属性出现在操作中:telecom,telecom。请检查是否使用了正确的属性。

root
 |-- resource: struct (nullable = true)
 |    |-- address: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- line: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- postalCode: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- identifier: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- type: struct (nullable = true)
 |    |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- name: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- family: string (nullable = true)
 |    |    |    |-- given: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- suffix: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- use: string (nullable = true)
 |    |-- resourceType: string (nullable = true)
 |    |-- telecom: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- div: string (nullable = true)
 |    |    |-- status: string (nullable = true)

import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

json_file_path = 'C:\Users\M\Documents\Practitioner.json'
source_df = spark.read.json(json_file_path, multiLine=True)

source_df.printSchema()
output = source_df.select(source_df["resource.name"][0].alias("name"), 
                        source_df["resource.telecom"].alias("telecom"),
                        source_df["resource.address"][0].alias("address"))
output.printSchema()

practitioner = output.select(
    output.name.family.alias("Practitioner_LastName"),
    output.name.suffix.alias("Practitioner_NameSuffix"),
    output.name.given[0].alias("Practitioner_FirstName"),
    output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "phone").telecom.value,
    output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "fax").telecom.value,
    output.address.city.alias("PractitionerCity"),
    output.address.line[0].alias("PractitionerAddress_1"), 
    output.address.postalCode.alias("PractitionerZip"), 
    output.address.state.alias("PractitionerState")
)                    
        
practitioner.printSchema()
practitioner.show()

我的 json 是: {"resource":{"resourceType":"Practitioner","id":"scm-ambqa1821624401190","text":{"status":"generated","div":""},"identifier":[{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"NPI"}]},"value":"1548206097"},{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"DEA"}]},"value":"HB1548206"}],"name":[{"use":"official","family":"BERNSTEIN","given":["HELENE","B"],"suffix":["MD"]}],"telecom":[{"system":"phone","value":"6106547854","use":"work"},{"system":"email","value":"sachin.belhekar@allscripts.com","use":"work"},{"system":"fax","value":"7106547895","use":"work"}],"address":[{"use":"work","line":["West Street 1","West Street 2"],"city":"Michigan","state":"MI","postalCode":"49036","country":"USA"}]}}

数据结构有点复杂,我会用一个UDF来解析:

import pyspark.sql.functions as f
import pyspark.sql.types as t

@f.udf(t.StringType())
def phone_parser(row):
    for item in row:
        if item['system'] == 'phone':
            return item['value']
        
@f.udf(t.StringType())
def fax_parser(row):
    for item in row:
        if item['system'] == 'fax':
            return item['value']

output.select(phone_parser('telecom'), fax_parser('telecom'))