对 pyspark 中数组字段的引用不明确

Ambiguous reference to array fields in pyspark

我是 pyspark 的新手。我希望能够从我的 kafka 主题中读取值。为此,我为主题中的消息创建了一个架构。

这是我的 kafka 主题中的示例消息:

{
   "action": "string",
   "id": "string",
   "epoch": "long",
   "entity": {
       "type": "string",
       "sources": [{
           "items": [
               {"identifier": "string"},
               {"identifier": "string"},
           ] 
       }]
   }
}

这是我的架构:

root
 |-- value: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- epoch: long (nullable = true)
 |    |-- entity: struct (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |-- entity: struct (nullable = true)
 |    |    |-- sources: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- identifier: string (nullable = true)

但是,我在尝试 select 我感兴趣的字段时遇到错误。

value_df = df.select(
    from_json(col("value").cast("string"), Schema).alias("value"), 
)

explode_df = value_df.select(
    "value.id", 
    "value.action", 
    "value.epoch",
    "value.entity.type",
    "value.entity.sources.items.identifier", 
)

explode_df.printSchema()

这是我得到的错误:

An error was encountered:
Ambiguous reference to fields StructField(entity,StructType(StructField(type,StringType,true)),true), StructField(entity,StructType(StructField(sources,ArrayType(StructType(StructField(items,ArrayType(StructType(StructField(identifier,StringType,true)),true),true)),true),true)),true)
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1671, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Ambiguous reference to fields StructField(entity,StructType(StructField(type,StringType,true)),true), StructField(entity,StructType(StructField(sources,ArrayType(StructType(StructField(items,ArrayType(StructType(StructField(identifier,StringType,true)),true),true)),true),true)),true)

我相信这是因为 sourcesitems 都是数组。有没有办法使用 pyspark 将数组读入 df?

您的架构中似乎有两次 entity,这是不明确的引用。