Pyspark 中的 Array<string> 字段映射

Array<string> fields mapping in Pyspark

我有一个数据框如下

vy.printSchema()
root
 |-- data_source: string (nullable = true)
 |-- run_time: string (nullable = true)
 |-- expectation_type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- validation_field: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- passed: array (nullable = true)
 |    |-- element: boolean (containsNull = true)
 |-- row_count: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- unexpected_count: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- unexpected_percent: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- observed_value: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- expected_data_type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- expected_row_count: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- expected_min_value: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- expected_max_value: array (nullable = true)
 |    |-- element: long (containsNull = true)

下面是示例数据

vy.show(10,False)
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|data_source        |run_time                        |expectation_type                                                                         |validation_field                                     |passed                                 |row_count  |unexpected_count|unexpected_percent|observed_value                |expected_data_type|expected_row_count|expected_min_value|expected_max_value|
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|[column_to_exist, non_nullable_cols, data_type, column_to_exist, row_count, sum_expected]|[country, country, country, countray,, active_stores]|[true, true, true, false, false, false]|[, 102,,,,]|[, 0,,,,]       |[, 0.0,,,,]       |[,, StringType,, 102, 22075.0]|[,, StringType,,,]|[,,,, 10,]        |[,,,,, 100]       |[,,,,, 1000]      |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+

预计会看到如下数据-


+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|data_source        |run_time                        |expectation_type      |validation_field   |passed             |row_count   |unexpected_count|unexpected_percent|observed_value                |expected_data_type|expected_row_count|expected_min_value|expected_max_value|
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|column_to_exist       |country            |true               |            |                |                  |                              |                  |                  |                  |                  |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|non_nullable_cols     |country            |true               |102         |0               |0.0               |                              |                  |                  |                  |                  |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|data_type             |country            |true               |            |                |                  |StringType                    |StringType        |                  |                  |                  |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|column_to_exist       |countray           |false              |            |                |                  |                              |                  |                  |                  |                  |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|row_count             |null               |false              |            |                |                  |102                           |                  |10                |                  |                  |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|sum_expected          |active_stores      |false              |            |                |                  |22075.0                       |                  |                  |100               |1000              |
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+

我尝试压缩和解压缩,但 运行 遇到问题 pyspark.sql.utils.AnalysisException:由于数据类型不匹配

,无法解决 arrays_zip

谁能帮帮我。

谢谢你!

data=[('mmm_na_activestores','2022-02-24T05:43:16.678220+00:00',
       ['column_to_exist', 'non_nullable_cols', 'data_type','column_to_exist', 'row_count', 'sum_expected'],
       ['country', 'country', 'country', 'countray',"", 'active_stores'],
       ['true', 'true', 'true', 'false', 'false', 'false'],
       ["", 102,"","","",""],["", 0,"","","",""], ["", 0.0,"","","",""],
       ["","", 'StringType',"", 102, 22075.0], 
       ["","", 'StringType',"","",""],
       ["","","","", 10,""],
       ["","","","","", 100],
       ["","","","","", 1000]
      )]
schema=('data_source', 'run_time', 'expectation_type', 'validation_field', 'passed', 'row_count', 'unexpected_count', 'unexpected_percent', 'observed_value', 'expected_data_type', 'expected_row_count', 'expected_min_value', 'expected_max_value')

inp=spark.createDataFrame(data,schema)
ip=inp.withColumn("data_source",F.lit(inp['data_source']))\
.withColumn("run_time", F.lit(inp["run_time"]))\
.withColumn("expectation_type", F.explode(inp["expectation_type"]))\
.withColumn("validation_field", F.explode(inp["validation_field"]))\
.withColumn("passed", F.explode(inp["passed"]))\
.withColumn("row_count", F.explode(inp["row_count"]))\
.withColumn("unexpected_count", F.explode(inp["unexpected_count"]))\
.withColumn("unexpected_percent", F.explode(inp["unexpected_percent"]))\
.withColumn("observed_value", F.explode(inp["observed_value"]))\
.withColumn("expected_data_type", F.explode(inp["expected_data_type"]))\
.withColumn("expected_row_count", F.explode(inp["expected_row_count"]))\
.withColumn("expected_min_value", F.explode(inp["expected_min_value"]))\
.withColumn("expected_max_value", F.explode(inp["expected_max_value"]))
ip.show()

ip=ip.select('data_source', 'run_time', 'expectation_type', 'validation_field', 'passed', 'row_count', 'unexpected_count', 'unexpected_percent', 'observed_value', 'expected_data_type', 'expected_row_count', 'expected_min_value', 'expected_max_value')
out=ip.dropDuplicates()
out.show(truncate=0)