在 pyspark 中使用字符串 <array> 和字符串创建 table

creating a table with string<array> and string in pyspark

我有如下数据集。

+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
|emp_id                                                 |sik_id                                                             |modification_date                                                                      |file_name    |org_path                                                 |received_date|
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+
|[85627230-s387s09, 98722016-s015s05, 40022035-s008s21] |[f13c1320-5c8f3daas5cd, f13c1384-6659-4831, 4831-aaf1-5c8f3da]     |[2021-04-19T11:43:32.617953Z, 2021-04-19T11:43:32.858290Z, 2021-04-19T11:43:34.027082Z]|test1.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/    |2022-01-25   |
|[67dm34-4334, 8723gv6-2022, 6f7m99-2244-ki856]         |[66d9-4888-aaf1, aaf1-5c8f3da1d5cd, f13c1884-66d9]                 |[2020-11-12T23:22:05.433107Z, 2020-11-12T20:16:51.339437Z, 2020-11-11T20:59:03.758126Z]|test2.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/    |2022-01-25   |
+-------------------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------+-------------+---------------------------------------------------------+-------------+

其架构包含如下数组和字符串字段

>>> df.printSchema()
root
 |-- emp_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sik_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- modification_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- file_name: string (nullable = false)
 |-- org_path: string (nullable = false)
 |-- received_date: string (nullable = false)

我想得到如下所示的结果,其中每个 emp_id、sik_id、modification_date 都得到正确的 file_name、org_path、received_date

+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
|emp_id           |sik_id                    |modification_date            |file_name    |org_path                                                 |received_date|
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+
|85627230-s387s09 |f13c1320-5c8f3daas5cd     |2021-04-19T11:43:32.617953Z  |test1.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/    |2022-01-25   |
|98722016-s015s05 |f13c1384-6659-4831        |2021-04-19T11:43:32.858290Z  |test1.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/    |2022-01-25   |
|40022035-s008s21 |4831-aaf1-5c8f3da         |2021-04-19T11:43:34.027082Z  |test1.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/    |2022-01-25   |
|67dm34-4334      |66d9-4888-aaf1            |2020-11-12T23:22:05.433107Z  |test2.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/    |2022-01-25   |
|8723gv6-2022     |aaf1-5c8f3da1d5cd         |2020-11-12T20:16:51.339437Z  |test2.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/    |2022-01-25   |
|6f7m99-2244-ki856|f13c1884-66d9             |2020-11-11T20:59:03.758126Z  |test2.json   |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/    |2022-01-25   |
+-----------------+--------------------------+-----------------------------+-------------+---------------------------------------------------------+-------------+

我尝试在这些字段上使用 zip(),但看起来 zip 不适用于数组和字符串字段。正如我看到的类型不匹配异常。

有人可以帮我找到正确的解决方案吗?

提前致谢。

尝试爆炸:

from pyspark.sql.functions import explode
arrayData = [(['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'], 'test1.json')]
df = spark.createDataFrame(data=arrayData, schema = ['emp_id', 'file_name'])
df2 = df.select(df.file_name,explode(df.emp_id))
df2.printSchema()
df2.show()

结合 SQL 函数 arrays_zipinline

df = df.selectExpr('inline(arrays_zip(emp_id, sik_id, modification_date))', 'file_name', 'org_path', 'received_date')
df.show(truncate=False)

您可以使用 array_zip in addition to explode -

数据准备

d = {
    'emp_id':[ 
            ['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'],
            ['67dm34-4334', '8723gv6-2022', '6f7m99-2244-ki856']
        ] ,
    'sik_id':[
        ['f13c1320-5c8f3daas5cd', 'f13c1384-6659-4831', '4831-aaf1-5c8f3da'],
        ['66d9-4888-aaf1', 'aaf1-5c8f3da1d5cd', 'f13c1884-66d9']

    ],
    'modification_date':[
        ['2021-04-19T11:43:32.617953Z', '2021-04-19T11:43:32.858290Z', '2021-04-19T11:43:34.027082Z'],
        ['2020-11-12T23:22:05.433107Z', '2020-11-12T20:16:51.339437Z', '2020-11-11T20:59:03.758126Z']
    ],
    'file_name': ['test1.json','test2.json'],
    'org_path': ['s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/','s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/'],
    'received_date': ['2022-01-25','2022-01-25']
}

df = pd.DataFrame(d)

sparkDF = sql.createDataFrame(df)

sparkDF.printSchema()

root
 |-- emp_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sik_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- modification_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- file_name: string (nullable = true)
 |-- org_path: string (nullable = true)
 |-- received_date: string (nullable = true)


数组压缩和分解

sparkDF = sparkDF.withColumn("exploded_tmp", F.arrays_zip( F.col('emp_id')
                                                        ,F.col('sik_id')
                                                        ,F.col('modification_date')
                                                       )
                 )\
                 .withColumn("exploded", F.explode("exploded_tmp"))


sparkDF.select(
             F.col('exploded.emp_id')
            ,F.col('exploded.sik_id')
            ,F.col('exploded.modification_date')
            ,F.col('file_name')
            ,F.col('org_path')
            ,F.col('received_date')
        ).show()

+-----------------+--------------------+--------------------+----------+--------------------+-------------+
|           emp_id|              sik_id|   modification_date| file_name|            org_path|received_date|
+-----------------+--------------------+--------------------+----------+--------------------+-------------+
| 85627230-s387s09|f13c1320-5c8f3daa...|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...|   2022-01-25|
| 98722016-s015s05|  f13c1384-6659-4831|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...|   2022-01-25|
| 40022035-s008s21|   4831-aaf1-5c8f3da|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...|   2022-01-25|
|      67dm34-4334|      66d9-4888-aaf1|2020-11-12T23:22:...|test2.json|s3://my-bucket/te...|   2022-01-25|
|     8723gv6-2022|   aaf1-5c8f3da1d5cd|2020-11-12T20:16:...|test2.json|s3://my-bucket/te...|   2022-01-25|
|6f7m99-2244-ki856|       f13c1884-66d9|2020-11-11T20:59:...|test2.json|s3://my-bucket/te...|   2022-01-25|
+-----------------+--------------------+--------------------+----------+--------------------+-------------+