Pyspark:在分解数组后选择一个值
Pyspark: Selecting a value after exploding an array
我是 pyspark 的新手,正在尝试解析 telecom.value if telecom.system = "fax|phone" 但遇到以下错误。我知道 filter() 会 return 我是一个结构,我正在 select 从中获取一个列。我如何 select 调用 filter() 后的列值?
raise_from 中的文件“”,第 3 行
pyspark.sql.utils.AnalysisException:运营商 !Project [name#3.family AS Practitioner_LastName# 中的 name#3、telecom#5、address#7 中缺少已解决的属性 telecom#27、telecom#33 23,姓名#3.suffix AS Practitioner_NameSuffix#24,姓名#3.given[0] AS Practitioner_FirstName#25,电信#27.value AS telecom.value #42,电信#33.value AS telecom.value#43,地址#7.city AS PractitionerCity#38,地址#7.line[0] AS PractitionerAddress_1#39,地址#7.postalCode AS PractitionerZip#40,地址#7.state AS PractitionerState#41]。具有相同名称的属性出现在操作中:telecom,telecom。请检查是否使用了正确的属性。
root
|-- resource: struct (nullable = true)
| |-- address: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- city: string (nullable = true)
| | | |-- country: string (nullable = true)
| | | |-- line: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- postalCode: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- use: string (nullable = true)
| |-- id: string (nullable = true)
| |-- identifier: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- type: struct (nullable = true)
| | | | |-- coding: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- code: string (nullable = true)
| | | | | | |-- system: string (nullable = true)
| | | |-- use: string (nullable = true)
| | | |-- value: string (nullable = true)
| |-- name: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- family: string (nullable = true)
| | | |-- given: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- suffix: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- use: string (nullable = true)
| |-- resourceType: string (nullable = true)
| |-- telecom: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- system: string (nullable = true)
| | | |-- use: string (nullable = true)
| | | |-- value: string (nullable = true)
| |-- text: struct (nullable = true)
| | |-- div: string (nullable = true)
| | |-- status: string (nullable = true)
import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()
json_file_path = 'C:\Users\M\Documents\Practitioner.json'
source_df = spark.read.json(json_file_path, multiLine=True)
source_df.printSchema()
output = source_df.select(source_df["resource.name"][0].alias("name"),
source_df["resource.telecom"].alias("telecom"),
source_df["resource.address"][0].alias("address"))
output.printSchema()
practitioner = output.select(
output.name.family.alias("Practitioner_LastName"),
output.name.suffix.alias("Practitioner_NameSuffix"),
output.name.given[0].alias("Practitioner_FirstName"),
output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "phone").telecom.value,
output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "fax").telecom.value,
output.address.city.alias("PractitionerCity"),
output.address.line[0].alias("PractitionerAddress_1"),
output.address.postalCode.alias("PractitionerZip"),
output.address.state.alias("PractitionerState")
)
practitioner.printSchema()
practitioner.show()
我的 json 是:
{"resource":{"resourceType":"Practitioner","id":"scm-ambqa1821624401190","text":{"status":"generated","div":""},"identifier":[{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"NPI"}]},"value":"1548206097"},{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"DEA"}]},"value":"HB1548206"}],"name":[{"use":"official","family":"BERNSTEIN","given":["HELENE","B"],"suffix":["MD"]}],"telecom":[{"system":"phone","value":"6106547854","use":"work"},{"system":"email","value":"sachin.belhekar@allscripts.com","use":"work"},{"system":"fax","value":"7106547895","use":"work"}],"address":[{"use":"work","line":["West Street 1","West Street 2"],"city":"Michigan","state":"MI","postalCode":"49036","country":"USA"}]}}
数据结构有点复杂,我会用一个UDF来解析:
import pyspark.sql.functions as f
import pyspark.sql.types as t
@f.udf(t.StringType())
def phone_parser(row):
for item in row:
if item['system'] == 'phone':
return item['value']
@f.udf(t.StringType())
def fax_parser(row):
for item in row:
if item['system'] == 'fax':
return item['value']
output.select(phone_parser('telecom'), fax_parser('telecom'))
我是 pyspark 的新手,正在尝试解析 telecom.value if telecom.system = "fax|phone" 但遇到以下错误。我知道 filter() 会 return 我是一个结构,我正在 select 从中获取一个列。我如何 select 调用 filter() 后的列值?
raise_from 中的文件“”,第 3 行 pyspark.sql.utils.AnalysisException:运营商 !Project [name#3.family AS Practitioner_LastName# 中的 name#3、telecom#5、address#7 中缺少已解决的属性 telecom#27、telecom#33 23,姓名#3.suffix AS Practitioner_NameSuffix#24,姓名#3.given[0] AS Practitioner_FirstName#25,电信#27.value AS telecom.value #42,电信#33.value AS telecom.value#43,地址#7.city AS PractitionerCity#38,地址#7.line[0] AS PractitionerAddress_1#39,地址#7.postalCode AS PractitionerZip#40,地址#7.state AS PractitionerState#41]。具有相同名称的属性出现在操作中:telecom,telecom。请检查是否使用了正确的属性。
root
|-- resource: struct (nullable = true)
| |-- address: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- city: string (nullable = true)
| | | |-- country: string (nullable = true)
| | | |-- line: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- postalCode: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- use: string (nullable = true)
| |-- id: string (nullable = true)
| |-- identifier: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- type: struct (nullable = true)
| | | | |-- coding: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- code: string (nullable = true)
| | | | | | |-- system: string (nullable = true)
| | | |-- use: string (nullable = true)
| | | |-- value: string (nullable = true)
| |-- name: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- family: string (nullable = true)
| | | |-- given: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- suffix: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- use: string (nullable = true)
| |-- resourceType: string (nullable = true)
| |-- telecom: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- system: string (nullable = true)
| | | |-- use: string (nullable = true)
| | | |-- value: string (nullable = true)
| |-- text: struct (nullable = true)
| | |-- div: string (nullable = true)
| | |-- status: string (nullable = true)
import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()
json_file_path = 'C:\Users\M\Documents\Practitioner.json'
source_df = spark.read.json(json_file_path, multiLine=True)
source_df.printSchema()
output = source_df.select(source_df["resource.name"][0].alias("name"),
source_df["resource.telecom"].alias("telecom"),
source_df["resource.address"][0].alias("address"))
output.printSchema()
practitioner = output.select(
output.name.family.alias("Practitioner_LastName"),
output.name.suffix.alias("Practitioner_NameSuffix"),
output.name.given[0].alias("Practitioner_FirstName"),
output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "phone").telecom.value,
output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "fax").telecom.value,
output.address.city.alias("PractitionerCity"),
output.address.line[0].alias("PractitionerAddress_1"),
output.address.postalCode.alias("PractitionerZip"),
output.address.state.alias("PractitionerState")
)
practitioner.printSchema()
practitioner.show()
我的 json 是:
{"resource":{"resourceType":"Practitioner","id":"scm-ambqa1821624401190","text":{"status":"generated","div":""},"identifier":[{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"NPI"}]},"value":"1548206097"},{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"DEA"}]},"value":"HB1548206"}],"name":[{"use":"official","family":"BERNSTEIN","given":["HELENE","B"],"suffix":["MD"]}],"telecom":[{"system":"phone","value":"6106547854","use":"work"},{"system":"email","value":"sachin.belhekar@allscripts.com","use":"work"},{"system":"fax","value":"7106547895","use":"work"}],"address":[{"use":"work","line":["West Street 1","West Street 2"],"city":"Michigan","state":"MI","postalCode":"49036","country":"USA"}]}}
数据结构有点复杂,我会用一个UDF来解析:
import pyspark.sql.functions as f
import pyspark.sql.types as t
@f.udf(t.StringType())
def phone_parser(row):
for item in row:
if item['system'] == 'phone':
return item['value']
@f.udf(t.StringType())
def fax_parser(row):
for item in row:
if item['system'] == 'fax':
return item['value']
output.select(phone_parser('telecom'), fax_parser('telecom'))