Pyspark 中的 Array<string> 字段映射
Array<string> fields mapping in Pyspark
我有一个数据框如下
vy.printSchema()
root
|-- data_source: string (nullable = true)
|-- run_time: string (nullable = true)
|-- expectation_type: array (nullable = true)
| |-- element: string (containsNull = true)
|-- validation_field: array (nullable = true)
| |-- element: string (containsNull = true)
|-- passed: array (nullable = true)
| |-- element: boolean (containsNull = true)
|-- row_count: array (nullable = true)
| |-- element: long (containsNull = true)
|-- unexpected_count: array (nullable = true)
| |-- element: long (containsNull = true)
|-- unexpected_percent: array (nullable = true)
| |-- element: double (containsNull = true)
|-- observed_value: array (nullable = true)
| |-- element: string (containsNull = true)
|-- expected_data_type: array (nullable = true)
| |-- element: string (containsNull = true)
|-- expected_row_count: array (nullable = true)
| |-- element: long (containsNull = true)
|-- expected_min_value: array (nullable = true)
| |-- element: long (containsNull = true)
|-- expected_max_value: array (nullable = true)
| |-- element: long (containsNull = true)
下面是示例数据
vy.show(10,False)
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|data_source |run_time |expectation_type |validation_field |passed |row_count |unexpected_count|unexpected_percent|observed_value |expected_data_type|expected_row_count|expected_min_value|expected_max_value|
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|[column_to_exist, non_nullable_cols, data_type, column_to_exist, row_count, sum_expected]|[country, country, country, countray,, active_stores]|[true, true, true, false, false, false]|[, 102,,,,]|[, 0,,,,] |[, 0.0,,,,] |[,, StringType,, 102, 22075.0]|[,, StringType,,,]|[,,,, 10,] |[,,,,, 100] |[,,,,, 1000] |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
预计会看到如下数据-
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|data_source |run_time |expectation_type |validation_field |passed |row_count |unexpected_count|unexpected_percent|observed_value |expected_data_type|expected_row_count|expected_min_value|expected_max_value|
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|column_to_exist |country |true | | | | | | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|non_nullable_cols |country |true |102 |0 |0.0 | | | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|data_type |country |true | | | |StringType |StringType | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|column_to_exist |countray |false | | | | | | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|row_count |null |false | | | |102 | |10 | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|sum_expected |active_stores |false | | | |22075.0 | | |100 |1000 |
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
我尝试压缩和解压缩,但 运行 遇到问题 pyspark.sql.utils.AnalysisException:由于数据类型不匹配
,无法解决 arrays_zip
谁能帮帮我。
谢谢你!
data=[('mmm_na_activestores','2022-02-24T05:43:16.678220+00:00',
['column_to_exist', 'non_nullable_cols', 'data_type','column_to_exist', 'row_count', 'sum_expected'],
['country', 'country', 'country', 'countray',"", 'active_stores'],
['true', 'true', 'true', 'false', 'false', 'false'],
["", 102,"","","",""],["", 0,"","","",""], ["", 0.0,"","","",""],
["","", 'StringType',"", 102, 22075.0],
["","", 'StringType',"","",""],
["","","","", 10,""],
["","","","","", 100],
["","","","","", 1000]
)]
schema=('data_source', 'run_time', 'expectation_type', 'validation_field', 'passed', 'row_count', 'unexpected_count', 'unexpected_percent', 'observed_value', 'expected_data_type', 'expected_row_count', 'expected_min_value', 'expected_max_value')
inp=spark.createDataFrame(data,schema)
ip=inp.withColumn("data_source",F.lit(inp['data_source']))\
.withColumn("run_time", F.lit(inp["run_time"]))\
.withColumn("expectation_type", F.explode(inp["expectation_type"]))\
.withColumn("validation_field", F.explode(inp["validation_field"]))\
.withColumn("passed", F.explode(inp["passed"]))\
.withColumn("row_count", F.explode(inp["row_count"]))\
.withColumn("unexpected_count", F.explode(inp["unexpected_count"]))\
.withColumn("unexpected_percent", F.explode(inp["unexpected_percent"]))\
.withColumn("observed_value", F.explode(inp["observed_value"]))\
.withColumn("expected_data_type", F.explode(inp["expected_data_type"]))\
.withColumn("expected_row_count", F.explode(inp["expected_row_count"]))\
.withColumn("expected_min_value", F.explode(inp["expected_min_value"]))\
.withColumn("expected_max_value", F.explode(inp["expected_max_value"]))
ip.show()
ip=ip.select('data_source', 'run_time', 'expectation_type', 'validation_field', 'passed', 'row_count', 'unexpected_count', 'unexpected_percent', 'observed_value', 'expected_data_type', 'expected_row_count', 'expected_min_value', 'expected_max_value')
out=ip.dropDuplicates()
out.show(truncate=0)
我有一个数据框如下
vy.printSchema()
root
|-- data_source: string (nullable = true)
|-- run_time: string (nullable = true)
|-- expectation_type: array (nullable = true)
| |-- element: string (containsNull = true)
|-- validation_field: array (nullable = true)
| |-- element: string (containsNull = true)
|-- passed: array (nullable = true)
| |-- element: boolean (containsNull = true)
|-- row_count: array (nullable = true)
| |-- element: long (containsNull = true)
|-- unexpected_count: array (nullable = true)
| |-- element: long (containsNull = true)
|-- unexpected_percent: array (nullable = true)
| |-- element: double (containsNull = true)
|-- observed_value: array (nullable = true)
| |-- element: string (containsNull = true)
|-- expected_data_type: array (nullable = true)
| |-- element: string (containsNull = true)
|-- expected_row_count: array (nullable = true)
| |-- element: long (containsNull = true)
|-- expected_min_value: array (nullable = true)
| |-- element: long (containsNull = true)
|-- expected_max_value: array (nullable = true)
| |-- element: long (containsNull = true)
下面是示例数据
vy.show(10,False)
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|data_source |run_time |expectation_type |validation_field |passed |row_count |unexpected_count|unexpected_percent|observed_value |expected_data_type|expected_row_count|expected_min_value|expected_max_value|
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|[column_to_exist, non_nullable_cols, data_type, column_to_exist, row_count, sum_expected]|[country, country, country, countray,, active_stores]|[true, true, true, false, false, false]|[, 102,,,,]|[, 0,,,,] |[, 0.0,,,,] |[,, StringType,, 102, 22075.0]|[,, StringType,,,]|[,,,, 10,] |[,,,,, 100] |[,,,,, 1000] |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------+-----------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
预计会看到如下数据-
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|data_source |run_time |expectation_type |validation_field |passed |row_count |unexpected_count|unexpected_percent|observed_value |expected_data_type|expected_row_count|expected_min_value|expected_max_value|
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|column_to_exist |country |true | | | | | | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|non_nullable_cols |country |true |102 |0 |0.0 | | | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|data_type |country |true | | | |StringType |StringType | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|column_to_exist |countray |false | | | | | | | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|row_count |null |false | | | |102 | |10 | | |
|mmm_na_activestores|2022-02-24T05:43:16.678220+00:00|sum_expected |active_stores |false | | | |22075.0 | | |100 |1000 |
+-------------------+--------------------------------+----------------------+-------------------+-------------------+------------+----------------+------------------+------------------------------+------------------+------------------+------------------+------------------+
我尝试压缩和解压缩,但 运行 遇到问题 pyspark.sql.utils.AnalysisException:由于数据类型不匹配
,无法解决 arrays_zip谁能帮帮我。
谢谢你!
data=[('mmm_na_activestores','2022-02-24T05:43:16.678220+00:00',
['column_to_exist', 'non_nullable_cols', 'data_type','column_to_exist', 'row_count', 'sum_expected'],
['country', 'country', 'country', 'countray',"", 'active_stores'],
['true', 'true', 'true', 'false', 'false', 'false'],
["", 102,"","","",""],["", 0,"","","",""], ["", 0.0,"","","",""],
["","", 'StringType',"", 102, 22075.0],
["","", 'StringType',"","",""],
["","","","", 10,""],
["","","","","", 100],
["","","","","", 1000]
)]
schema=('data_source', 'run_time', 'expectation_type', 'validation_field', 'passed', 'row_count', 'unexpected_count', 'unexpected_percent', 'observed_value', 'expected_data_type', 'expected_row_count', 'expected_min_value', 'expected_max_value')
inp=spark.createDataFrame(data,schema)
ip=inp.withColumn("data_source",F.lit(inp['data_source']))\
.withColumn("run_time", F.lit(inp["run_time"]))\
.withColumn("expectation_type", F.explode(inp["expectation_type"]))\
.withColumn("validation_field", F.explode(inp["validation_field"]))\
.withColumn("passed", F.explode(inp["passed"]))\
.withColumn("row_count", F.explode(inp["row_count"]))\
.withColumn("unexpected_count", F.explode(inp["unexpected_count"]))\
.withColumn("unexpected_percent", F.explode(inp["unexpected_percent"]))\
.withColumn("observed_value", F.explode(inp["observed_value"]))\
.withColumn("expected_data_type", F.explode(inp["expected_data_type"]))\
.withColumn("expected_row_count", F.explode(inp["expected_row_count"]))\
.withColumn("expected_min_value", F.explode(inp["expected_min_value"]))\
.withColumn("expected_max_value", F.explode(inp["expected_max_value"]))
ip.show()
ip=ip.select('data_source', 'run_time', 'expectation_type', 'validation_field', 'passed', 'row_count', 'unexpected_count', 'unexpected_percent', 'observed_value', 'expected_data_type', 'expected_row_count', 'expected_min_value', 'expected_max_value')
out=ip.dropDuplicates()
out.show(truncate=0)