将字典文件读取为 pyspark 数据框
Read a file of dictionaries as pyspark dataframe
我正在处理的问题是我有一个文件(或多个文件)充满了字典,我正试图进入一个数据框。输入文件可能如下所示:
{"A":"value1", "B":"value2"}
{"A":"value2", "B":"value3"}
{"A":"value4", "B":"value5", "C":"value6"}
我正在处理的问题:
- 字典没有用换行符或逗号或任何东西分隔。这是一个单行文件,不幸的是,我对此无能为力。
- 字典可以有不同数量的键。但我有最终数据框的模式。
在上面的例子中,期望的结果是:
A B C
value1 value2 null
value2 value3 null
value4 value5 value6
到目前为止我尝试过的:
spark_sql_context.read.json(path_to_file)
这只读取第一个字典和 returns 一个只有一行的 pyspark 数据框。我也试过将其作为文本文件阅读:
data_rdd = spark_context.textFile(path_to_file)
问题是我不知道:
- 如何拆分行,因为字典和
之间没有分隔符
- 字典有不同的长度。
如果您能指出解决此问题的方法或解决方案,我将不胜感激。
您可以将其作为文本阅读,然后按 }{
拆分以获得 JSON 个对象的数组。为此,首先,我们用 };{
替换 }{
,然后用 ;
拆分。
df = spark.read.text(path)
df = df.withColumn("values", explode(split(regexp_replace(col("value"), "\}\{", "\};\{"), ";")))
df.show()
#+------------------------------------------+
#|value |
#+------------------------------------------+
#|{"A":"value1", "B":"value2"} |
#|{"A":"value2", "B":"value3"} |
#|{"A":"value4", "B":"value5", "C":"value6"}|
#+------------------------------------------+
现在,将 from_json
与您的 schema
一起使用,将 json 解析为结构:
schema = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", StringType(), True)
])
df = df.withColumn("value", from_json(col("value"), schema)).select("value.*")
df.show()
#+------+------+------+
#|A |B |C |
#+------+------+------+
#|value1|value2|null |
#|value2|value3|null |
#|value4|value5|value6|
#+------+------+------+
我正在处理的问题是我有一个文件(或多个文件)充满了字典,我正试图进入一个数据框。输入文件可能如下所示:
{"A":"value1", "B":"value2"}
{"A":"value2", "B":"value3"}
{"A":"value4", "B":"value5", "C":"value6"}
我正在处理的问题:
- 字典没有用换行符或逗号或任何东西分隔。这是一个单行文件,不幸的是,我对此无能为力。
- 字典可以有不同数量的键。但我有最终数据框的模式。
在上面的例子中,期望的结果是:
A B C
value1 value2 null
value2 value3 null
value4 value5 value6
到目前为止我尝试过的:
spark_sql_context.read.json(path_to_file)
这只读取第一个字典和 returns 一个只有一行的 pyspark 数据框。我也试过将其作为文本文件阅读:
data_rdd = spark_context.textFile(path_to_file)
问题是我不知道:
- 如何拆分行,因为字典和 之间没有分隔符
- 字典有不同的长度。
如果您能指出解决此问题的方法或解决方案,我将不胜感激。
您可以将其作为文本阅读,然后按 }{
拆分以获得 JSON 个对象的数组。为此,首先,我们用 };{
替换 }{
,然后用 ;
拆分。
df = spark.read.text(path)
df = df.withColumn("values", explode(split(regexp_replace(col("value"), "\}\{", "\};\{"), ";")))
df.show()
#+------------------------------------------+
#|value |
#+------------------------------------------+
#|{"A":"value1", "B":"value2"} |
#|{"A":"value2", "B":"value3"} |
#|{"A":"value4", "B":"value5", "C":"value6"}|
#+------------------------------------------+
现在,将 from_json
与您的 schema
一起使用,将 json 解析为结构:
schema = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", StringType(), True)
])
df = df.withColumn("value", from_json(col("value"), schema)).select("value.*")
df.show()
#+------+------+------+
#|A |B |C |
#+------+------+------+
#|value1|value2|null |
#|value2|value3|null |
#|value4|value5|value6|
#+------+------+------+