在 pyspark 中使用字符串 <array> 和字符串创建 table
creating a table with string<array> and string in pyspark
我有如下数据集。
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
|emp_id |sik_id |modification_date |file_name |org_path |received_date|
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
|[85627230-s387s09, 98722016-s015s05, 40022035-s008s21] |[f13c1320-5c8f3daas5cd, f13c1384-6659-4831, 4831-aaf1-5c8f3da] |[2021-04-19T11:43:32.617953Z, 2021-04-19T11:43:32.858290Z, 2021-04-19T11:43:34.027082Z]|test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|[67dm34-4334, 8723gv6-2022, 6f7m99-2244-ki856] |[66d9-4888-aaf1, aaf1-5c8f3da1d5cd, f13c1884-66d9] |[2020-11-12T23:22:05.433107Z, 2020-11-12T20:16:51.339437Z, 2020-11-11T20:59:03.758126Z]|test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
其架构包含如下数组和字符串字段
>>> df.printSchema()
root
|-- emp_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sik_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- modification_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file_name: string (nullable = false)
|-- org_path: string (nullable = false)
|-- received_date: string (nullable = false)
我想得到如下所示的结果,其中每个 emp_id、sik_id、modification_date 都得到正确的 file_name、org_path、received_date
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
|emp_id |sik_id |modification_date |file_name |org_path |received_date|
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
|85627230-s387s09 |f13c1320-5c8f3daas5cd |2021-04-19T11:43:32.617953Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|98722016-s015s05 |f13c1384-6659-4831 |2021-04-19T11:43:32.858290Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|40022035-s008s21 |4831-aaf1-5c8f3da |2021-04-19T11:43:34.027082Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|67dm34-4334 |66d9-4888-aaf1 |2020-11-12T23:22:05.433107Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
|8723gv6-2022 |aaf1-5c8f3da1d5cd |2020-11-12T20:16:51.339437Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
|6f7m99-2244-ki856|f13c1884-66d9 |2020-11-11T20:59:03.758126Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
我尝试在这些字段上使用 zip(),但看起来 zip 不适用于数组和字符串字段。正如我看到的类型不匹配异常。
有人可以帮我找到正确的解决方案吗?
提前致谢。
尝试爆炸:
from pyspark.sql.functions import explode
arrayData = [(['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'], 'test1.json')]
df = spark.createDataFrame(data=arrayData, schema = ['emp_id', 'file_name'])
df2 = df.select(df.file_name,explode(df.emp_id))
df2.printSchema()
df2.show()
结合 SQL 函数 arrays_zip
和 inline
。
df = df.selectExpr('inline(arrays_zip(emp_id, sik_id, modification_date))', 'file_name', 'org_path', 'received_date')
df.show(truncate=False)
您可以使用 array_zip in addition to explode -
数据准备
d = {
'emp_id':[
['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'],
['67dm34-4334', '8723gv6-2022', '6f7m99-2244-ki856']
] ,
'sik_id':[
['f13c1320-5c8f3daas5cd', 'f13c1384-6659-4831', '4831-aaf1-5c8f3da'],
['66d9-4888-aaf1', 'aaf1-5c8f3da1d5cd', 'f13c1884-66d9']
],
'modification_date':[
['2021-04-19T11:43:32.617953Z', '2021-04-19T11:43:32.858290Z', '2021-04-19T11:43:34.027082Z'],
['2020-11-12T23:22:05.433107Z', '2020-11-12T20:16:51.339437Z', '2020-11-11T20:59:03.758126Z']
],
'file_name': ['test1.json','test2.json'],
'org_path': ['s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/','s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/'],
'received_date': ['2022-01-25','2022-01-25']
}
df = pd.DataFrame(d)
sparkDF = sql.createDataFrame(df)
sparkDF.printSchema()
root
|-- emp_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sik_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- modification_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file_name: string (nullable = true)
|-- org_path: string (nullable = true)
|-- received_date: string (nullable = true)
数组压缩和分解
sparkDF = sparkDF.withColumn("exploded_tmp", F.arrays_zip( F.col('emp_id')
,F.col('sik_id')
,F.col('modification_date')
)
)\
.withColumn("exploded", F.explode("exploded_tmp"))
sparkDF.select(
F.col('exploded.emp_id')
,F.col('exploded.sik_id')
,F.col('exploded.modification_date')
,F.col('file_name')
,F.col('org_path')
,F.col('received_date')
).show()
+-----------------+--------------------+--------------------+----------+--------------------+-------------+
| emp_id| sik_id| modification_date| file_name| org_path|received_date|
+-----------------+--------------------+--------------------+----------+--------------------+-------------+
| 85627230-s387s09|f13c1320-5c8f3daa...|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 98722016-s015s05| f13c1384-6659-4831|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 40022035-s008s21| 4831-aaf1-5c8f3da|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 67dm34-4334| 66d9-4888-aaf1|2020-11-12T23:22:...|test2.json|s3://my-bucket/te...| 2022-01-25|
| 8723gv6-2022| aaf1-5c8f3da1d5cd|2020-11-12T20:16:...|test2.json|s3://my-bucket/te...| 2022-01-25|
|6f7m99-2244-ki856| f13c1884-66d9|2020-11-11T20:59:...|test2.json|s3://my-bucket/te...| 2022-01-25|
+-----------------+--------------------+--------------------+----------+--------------------+-------------+
我有如下数据集。
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
|emp_id |sik_id |modification_date |file_name |org_path |received_date|
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
|[85627230-s387s09, 98722016-s015s05, 40022035-s008s21] |[f13c1320-5c8f3daas5cd, f13c1384-6659-4831, 4831-aaf1-5c8f3da] |[2021-04-19T11:43:32.617953Z, 2021-04-19T11:43:32.858290Z, 2021-04-19T11:43:34.027082Z]|test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|[67dm34-4334, 8723gv6-2022, 6f7m99-2244-ki856] |[66d9-4888-aaf1, aaf1-5c8f3da1d5cd, f13c1884-66d9] |[2020-11-12T23:22:05.433107Z, 2020-11-12T20:16:51.339437Z, 2020-11-11T20:59:03.758126Z]|test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
其架构包含如下数组和字符串字段
>>> df.printSchema()
root
|-- emp_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sik_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- modification_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file_name: string (nullable = false)
|-- org_path: string (nullable = false)
|-- received_date: string (nullable = false)
我想得到如下所示的结果,其中每个 emp_id、sik_id、modification_date 都得到正确的 file_name、org_path、received_date
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
|emp_id |sik_id |modification_date |file_name |org_path |received_date|
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
|85627230-s387s09 |f13c1320-5c8f3daas5cd |2021-04-19T11:43:32.617953Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|98722016-s015s05 |f13c1384-6659-4831 |2021-04-19T11:43:32.858290Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|40022035-s008s21 |4831-aaf1-5c8f3da |2021-04-19T11:43:34.027082Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|67dm34-4334 |66d9-4888-aaf1 |2020-11-12T23:22:05.433107Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
|8723gv6-2022 |aaf1-5c8f3da1d5cd |2020-11-12T20:16:51.339437Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
|6f7m99-2244-ki856|f13c1884-66d9 |2020-11-11T20:59:03.758126Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
我尝试在这些字段上使用 zip(),但看起来 zip 不适用于数组和字符串字段。正如我看到的类型不匹配异常。
有人可以帮我找到正确的解决方案吗?
提前致谢。
尝试爆炸:
from pyspark.sql.functions import explode
arrayData = [(['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'], 'test1.json')]
df = spark.createDataFrame(data=arrayData, schema = ['emp_id', 'file_name'])
df2 = df.select(df.file_name,explode(df.emp_id))
df2.printSchema()
df2.show()
结合 SQL 函数 arrays_zip
和 inline
。
df = df.selectExpr('inline(arrays_zip(emp_id, sik_id, modification_date))', 'file_name', 'org_path', 'received_date')
df.show(truncate=False)
您可以使用 array_zip in addition to explode -
数据准备
d = {
'emp_id':[
['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'],
['67dm34-4334', '8723gv6-2022', '6f7m99-2244-ki856']
] ,
'sik_id':[
['f13c1320-5c8f3daas5cd', 'f13c1384-6659-4831', '4831-aaf1-5c8f3da'],
['66d9-4888-aaf1', 'aaf1-5c8f3da1d5cd', 'f13c1884-66d9']
],
'modification_date':[
['2021-04-19T11:43:32.617953Z', '2021-04-19T11:43:32.858290Z', '2021-04-19T11:43:34.027082Z'],
['2020-11-12T23:22:05.433107Z', '2020-11-12T20:16:51.339437Z', '2020-11-11T20:59:03.758126Z']
],
'file_name': ['test1.json','test2.json'],
'org_path': ['s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/','s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/'],
'received_date': ['2022-01-25','2022-01-25']
}
df = pd.DataFrame(d)
sparkDF = sql.createDataFrame(df)
sparkDF.printSchema()
root
|-- emp_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sik_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- modification_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file_name: string (nullable = true)
|-- org_path: string (nullable = true)
|-- received_date: string (nullable = true)
数组压缩和分解
sparkDF = sparkDF.withColumn("exploded_tmp", F.arrays_zip( F.col('emp_id')
,F.col('sik_id')
,F.col('modification_date')
)
)\
.withColumn("exploded", F.explode("exploded_tmp"))
sparkDF.select(
F.col('exploded.emp_id')
,F.col('exploded.sik_id')
,F.col('exploded.modification_date')
,F.col('file_name')
,F.col('org_path')
,F.col('received_date')
).show()
+-----------------+--------------------+--------------------+----------+--------------------+-------------+
| emp_id| sik_id| modification_date| file_name| org_path|received_date|
+-----------------+--------------------+--------------------+----------+--------------------+-------------+
| 85627230-s387s09|f13c1320-5c8f3daa...|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 98722016-s015s05| f13c1384-6659-4831|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 40022035-s008s21| 4831-aaf1-5c8f3da|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 67dm34-4334| 66d9-4888-aaf1|2020-11-12T23:22:...|test2.json|s3://my-bucket/te...| 2022-01-25|
| 8723gv6-2022| aaf1-5c8f3da1d5cd|2020-11-12T20:16:...|test2.json|s3://my-bucket/te...| 2022-01-25|
|6f7m99-2244-ki856| f13c1884-66d9|2020-11-11T20:59:...|test2.json|s3://my-bucket/te...| 2022-01-25|
+-----------------+--------------------+--------------------+----------+--------------------+-------------+