转置从 Pyspark 中解析的 XML 生成的数据框列

Transpose dataframe columns generated from parsed XML in Pyspark

作为问题陈述之一,我正在使用 PySpark 解析 XML 数据。

下面是示例数据-

<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
<component>Engineering

  <headers>
    <header>
      <name>Date</name>
      <value>05/05/2021 14:50:03</value>
    </header>
    <header>
      <name>StdName</name>
      <value>CoreEngineering</value>
    </header>
    <header>
      <name>ID</name>
      <value>12432AA</value>
    </header>
    <header>
      <name>DeviceType</name>
      <value>EngineGear</value>
    </header>
  </headers>
  
   <headers>
    <header>
      <name>Date</name>
      <value>05/05/2021 14:59:13</value>
    </header>
    <header>
      <name>StdName</name>
      <value>CoreEngineering</value>
    </header>
    <header>
      <name>ID</name>
      <value>98344AA</value>
    </header>
    <header>
      <name>DeviceType</name>
      <value>EngineExhaust</value>
    </header>
  </headers>
  
  
 </component>

使用 pyspark 在数据块中解析此 xml 文件时,我使用以下逻辑 -

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')

df = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "headers") \
    .option("rowTag", "header") \
    .load("/mnt/test/sourcedata/sample.xml") \
    .withColumn("processeddatetime",unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

收到的输出是:

但是,我想知道如何转置接收到的数据,因为进一步的数据转换需要这样做。 所需的数据集应具有如下架构 -

如何将此数据转换为 PySpark 中的上述预期模式?

在读取 xml 文件后对数据进行分组将很困难,因为数据不包含指示行属于一起的分组标准。

而不是使用 component 作为 rootTagheaders 作为 rowTag,您可以获得所需的行分组。在一行中,您可以根据需要使用 map_from_entries to separate the entries from each other and then select 地图的各个元素:

df = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "component") \
    .option("rowTag", "headers") \
    .load("test.xml") \
    .withColumn("tmp", F.map_from_entries("header")) \
    .selectExpr("'Engineering' as Component", 
          "tmp['Date'] as Date", 
          "tmp['StdName'] as StdName", 
          "tmp['ID'] as ID", 
          "tmp['DeviceType'] as DeviceType") \
    .withColumn("processeddatetime",F.unix_timestamp(F.lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")) \

Component 的值在此处进行了硬编码,因为您无法从 Spark xml.

中的根标记元素中获取值