创建数据框特定模式:以大写字母开头的 StructField

creating dataframe specific schema : StructField starting with capital letter

出于看似简单的好奇心,为冗长的 post 道歉,但我想提供完整的背景...

在 Databricks 中,我正在根据特定模式定义创建 "row" 数据,然后将该行插入到空数据框中(也基于相同的特定模式)。

模式定义如下所示:

myschema_xb = StructType(
  [
    StructField("_xmlns", StringType(), True),
    StructField("_Version", DoubleType(), True),
    StructField("MyIds",
      ArrayType(
        StructType(
          [
            StructField("_ID", StringType(), True),
            StructField("_ID_Context", StringType(), True),
            StructField("_Type", LongType(), True),
          ]
        ),
        True
      ),
      True
    ),
  ]
)

因此行条目是:

myRow = Row(
    _xmlns="http://some.where.com",
    _Version=12.3,
    MyIds=[
        Row(
          _ID="XY",
          _ID_Context="Exxwhy",
          _Type=9
        ),
        Row(
          _ID="9152",
          _ID_Context="LNUMB",
          _Type=21
        ),
    ]
)

最后,databricks 笔记本代码是:

mydf = spark.createDataFrame(sc.emptyRDD(), myschema_xb)
rows = [myRow]
rdf = spark.createDataFrame(rows, myschema_xb)
appended = mydf.union(rdf)

调用rdf = spark.createDataFrame(rows, myschema_xb)导致异常:

ValueError: Unexpected tuple 'h' with StructType.

现在我很好奇的部分是,如果我将元素 MyIds 更改为 myIds(即第一个字母小写),代码 works,我的新数据框(appended)只有一行数据。

此异常是什么意思?为什么当我更改元素的大小写时它会消失?

(仅供参考,我们的数据块运行时环境是 Scala 2.11)

谢谢。

问题应该来自 Row 对象如何对 keys/fields 进行排序,来自 documentation

Row can be used to create a row object by using named arguments, the fields will be sorted by names.

myschema_xb中,三列的定义顺序是[_xmlns, _Version, MyIds]。当您使用键定义 myRow 时:(_xmlns, _Version, MyIds),实际生成的 Row 对象将是:

Row(MyIds=[Row(_ID='XY', _ID_Context='Exxwhy', _Type=9), Row(_ID='9152', _ID_Context='LNUMB', _Type=21)], _Version=12.3, _xmlns='http://some.where.com')

其中 MyIds 移到了第一列,这与模式不匹配,因此产生错误。当您使用小写的 column-name myIds 时,Row 对象中的键被排序为 ['_Version', '_xmlns', 'myIds'],右列中有 myIds,但 _Version 和 [=20= 】 切换。这不会产生错误,因为简单数据类型可以通过类型转换,但生成的数据帧不正确。

要解决这个问题,您应该设置类似行的 class 并自定义键的顺序以确保字段的顺序与您的架构中显示的顺序完全匹配:

from pyspark.sql import Row

MyOuterROW = Row('_xmlns', '_Version', 'MyIds')
MyInnerRow = Row('_ID', '_ID_Context', '_Type')

myRow = MyOuterROW( 
    "http://some.where.com", 
    12.3, 
    [ 
        MyInnerROW("XY", "Exxwhy", 9), 
        MyInnerROW("9152", "LNUMB", 21) 
    ] 
)              
print(myRow)
#Row(_xmlns='http://some.where.com', _Version=12.3, MyIds=[Row(_ID='XY', _ID_Context='Exxwhy', _Type=9), Row(_ID='9152', _ID_Context='LNUMB', _Type=21)])

rdf = spark.createDataFrame([myRow], schema=myschema_xb)