如何将结构如 (key1, list(key2, value)) 的列表转换为 pyspark 中的数据框?

How to convert a list with structure like (key1, list(key2, value)) into a dataframe in pyspark?

我有如下所示的列表:

类型如下:

[(key1, [(key11, value11), (key12, value12)]), (key2, [(key21, value21), (key22, value22)...])...]

示例结构如下所示:

[('1052762305',
  [('1007819788', 0.9206884810054885),
   ('1005886801', 0.913818268123084),
   ('1003863766', 0.9131746152849486),
   ('1007811435', 0.9128666156173751),
   ('1005879599', 0.9126368405937075),
   ('1003705572', 0.9122051062936369),
   ('1007804896', 0.9083424459788203),
   ('1005890270', 0.8982097535650703),
   ('1007806781', 0.8708761186829758),
   ('1003670458', 0.8452789033694487)]),
 ('1064808607',
  [('1007804896', 0.9984397647563017),
   ('1003705572', 0.9970498347406341),
   ('1005879599', 0.9951581013190172),
   ('1007811435', 0.9934813787902085),
   ('1005886801', 0.9930572794622374),
   ('1003863766', 0.9928815742735568),
   ('1007819788', 0.9869723713790797),
   ('1005890270', 0.9642640856016443),
   ('1007806781', 0.9211558765137313),
   ('1003670458', 0.8519872445941068)])]

我想将其转换成以下形式的数据框

   key1          key2             score
1052762305    1007819788    0.9206884810054885
1052762305    1005886801    0.913818268123084
1052762305    1003863766    0.9131746152849486
  ...            ...              ...
1064808607    1007804896    0.9984397647563017
1064808607    1003705572    0.9970498347406341
1064808607    1005879599    0.9951581013190172
  ...            ...              ...

我们如何在 pyspark 中实现它?

您基本上需要执行以下操作:

  • 根据您的列表创建数据框
  • 使用 explode
  • 将数组元素中的对提升为单独的行
  • 通过select
  • 从对中提取键和值

这可以通过这样的方式完成(源数据位于名为 a 的变量中):

from pyspark.sql.functions import explode, col
df = spark.createDataFrame(a, ['key1', 'val'])
df2 = df.select(col('key1'), explode(col('val')).alias('val'))
df3 = df2.select('key1', col('val')._1.alias('key2'), col('val')._2.alias('value'))

我们可以检查架构和数据是否匹配:

>>> df3.printSchema()
root
 |-- key1: string (nullable = true)
 |-- key2: string (nullable = true)
 |-- value: double (nullable = true)

>>> df3.show(2)
+----------+----------+------------------+
|      key1|      key2|             value|
+----------+----------+------------------+
|1052762305|1007819788|0.9206884810054885|
|1052762305|1005886801| 0.913818268123084|
+----------+----------+------------------+
only showing top 2 rows

我们还可以检查架构以获得中间结果:

>>> df.printSchema()
root
 |-- key1: string (nullable = true)
 |-- val: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: double (nullable = true)

>>> df2.printSchema()
root
 |-- key1: string (nullable = true)
 |-- val: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: double (nullable = true)

您可以使用输入预先创建架构。使用 explode 并访问值结构中的元素。

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.types import  StructType, StructField,StringType,ArrayType, DoubleType


    spark = SparkSession.builder \
        .appName('SO')\
        .getOrCreate()


        schema = StructType([StructField("key1",StringType()), StructField("value",ArrayType(
            StructType([ StructField("key2", StringType()),
               StructField("score", DoubleType())])
        )) ])


    df = spark.createDataFrame(
        [('1052762305',
          [('1007819788', 0.9206884810054885),
           ('1005886801', 0.913818268123084),
           ('1003863766', 0.9131746152849486),
           ('1007811435', 0.9128666156173751),
           ('1005879599', 0.9126368405937075),
           ('1003705572', 0.9122051062936369),
           ('1007804896', 0.9083424459788203),
           ('1005890270', 0.8982097535650703),
           ('1007806781', 0.8708761186829758),
           ('1003670458', 0.8452789033694487)]),

         ('1064808607',
          [('1007804896', 0.9984397647563017),
           ('1003705572', 0.9970498347406341),
           ('1005879599', 0.9951581013190172),
           ('1007811435', 0.9934813787902085),
           ('1005886801', 0.9930572794622374),
           ('1003863766', 0.9928815742735568),
           ('1007819788', 0.9869723713790797),
           ('1005890270', 0.9642640856016443),
           ('1007806781', 0.9211558765137313),
           ('1003670458', 0.8519872445941068)])
         ],schema
    )

    df.show()

    +----------+--------------------+
    |      key1|          value    |
    +----------+--------------------+
    |1052762305|[[1007819788, 0.9...|
    |1064808607|[[1007804896, 0.9...|
    +----------+--------------------+

    df.printSchema()

    root
     |-- key1: string (nullable = true)
     |-- value: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- key2: string (nullable = true)
     |    |    |-- score: double (nullable = true)



    df1=df.select('key1',F.explode('value').alias('value'))
    df1.show()
    +----------+--------------------+
    |      key1|          value     |
    +----------+--------------------+
    |1052762305|[1007819788, 0.92...|
    |1052762305|[1005886801, 0.91...|
    |1052762305|[1003863766, 0.91...|
    |1052762305|[1007811435, 0.91...|
    |1052762305|[1005879599, 0.91...|
    |1052762305|[1003705572, 0.91...|
    |1052762305|[1007804896, 0.90...|
    |1052762305|[1005890270, 0.89...|
    |1052762305|[1007806781, 0.87...|
    |1052762305|[1003670458, 0.84...|
    |1064808607|[1007804896, 0.99...|
    |1064808607|[1003705572, 0.99...|
    |1064808607|[1005879599, 0.99...|
    |1064808607|[1007811435, 0.99...|
    |1064808607|[1005886801, 0.99...|
    |1064808607|[1003863766, 0.99...|
    |1064808607|[1007819788, 0.98...|
    |1064808607|[1005890270, 0.96...|
    |1064808607|[1007806781, 0.92...|
    |1064808607|[1003670458, 0.85...|
    +----------+--------------------+

    df1.printSchema()

    root
     |-- key1: string (nullable = true)
     |-- value: struct (nullable = true)
     |    |-- key2: string (nullable = true)
     |    |-- score: double (nullable = true)


    df1.select("key1", "value.key2","value.score").show()


    +----------+----------+------------------+
    |      key1|      key2|             score|
    +----------+----------+------------------+
    |1052762305|1007819788|0.9206884810054885|
    |1052762305|1005886801| 0.913818268123084|
    |1052762305|1003863766|0.9131746152849486|
    |1052762305|1007811435|0.9128666156173751|
    |1052762305|1005879599|0.9126368405937075|
    |1052762305|1003705572|0.9122051062936369|
    |1052762305|1007804896|0.9083424459788203|
    |1052762305|1005890270|0.8982097535650703|
    |1052762305|1007806781|0.8708761186829758|
    |1052762305|1003670458|0.8452789033694487|
    |1064808607|1007804896|0.9984397647563017|
    |1064808607|1003705572|0.9970498347406341|
    |1064808607|1005879599|0.9951581013190172|
    |1064808607|1007811435|0.9934813787902085|
    |1064808607|1005886801|0.9930572794622374|
    |1064808607|1003863766|0.9928815742735568|
    |1064808607|1007819788|0.9869723713790797|
    |1064808607|1005890270|0.9642640856016443|
    |1064808607|1007806781|0.9211558765137313|
    |1064808607|1003670458|0.8519872445941068|