将字符串列转换为 pyspark sql 数据框中的字典

Convert column of strings to dictionaries in pyspark sql dataframe

我必须使用每行都是一个 json 对象的文件格式。例如:

{'Attribute 1': 'A', 'Attribute 2': 1.5, 'Attribute 3': ['A','B','C'], 'Attribute 4': {'A': 5}}
{'Attribute 1': 'B', 'Attribute 2': 2.0, 'Attribute 3': ['A'], 'Attribute 4': {'A': 4}}
{'Attribute 1': 'C', 'Attribute 2': 1.7, 'Attribute 3': ['A','C'], 'Attribute 4': {'A': 3}}

请注意,这不是有效的 json 文件格式,因为它未包含在数组中。此外,实际结构要大得多且嵌套更多。这些文件分布在 s3 中。我以前只用过parquet或csv,所以我不知道如何读取这些文件。

我目前正在编写一个进程来将此数据与其他几个 table 连接起来,并且由于数据很大并且位于 s3 中,所以我在 emr 集群中使用 pyspark.sql 来做手术。我可以使用以下方法创建一个 table,其中包含作为字符串的对象的单个列:

from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
sqlContext = SQLContext(sc)

schema = StructType([
    StructField('json_format', StringType())
])

context = sqlContext.read
context = context.schema(schema)

df = context.load(
    folder_path,
    format='com.databricks.spark.csv',
    delimiter=','
)
df.createOrReplaceTempView('my_table')

如何将此列转换为可以访问各种属性的字典?是否有等效的 lambda 函数?

为了使 json 对象有效,我们可以将所有 ' 替换为 " 然后使用 get_json_object() 我们可以访问的函数属性。

Example:

df=sqlContext.sql("""select string("{'Attribute1': 'A', 'Attribute 2': 1.5, 'Attribute 3': ['A','B','C'], 'Attribute 4': {'A': 5}}") as str""")

#replacing ' with " using regexp_replace
df=df.withColumn("str",regexp_replace(col("str"),"\'","\""))
df.show(10,False)

#+----------------------------------------------------------------------------------------------+
#|str                                                                                           |
#+----------------------------------------------------------------------------------------------+
#|{"Attribute1": "A", "Attribute 2": 1.5, "Attribute 3": ["A","B","C"], "Attribute 4": {"A": 5}}|
#+----------------------------------------------------------------------------------------------+

#registering temp table
df.registerTempTable("tt")

#accessing Attribute 3
sqlContext.sql("select get_json_object(str,'$.Attribute 3') from tt").show()
#+-------------+
#|          _c0|
#+-------------+
#|["A","B","C"]|
#+-------------+

#accessing first array element from Attribute 3
sqlContext.sql("select get_json_object(str,'$.Attribute 3[0]') from tt").show()
#+---+
#|_c0|
#+---+
#|  A|
#+---+

#accessing Attribute 2
sqlContext.sql("select get_json_object(str,'$.Attribute 2') from tt").show()
#+---+
#|_c0|
#+---+
#|1.5|
#+---+