from_json returns Apache Spark Kafka readStream 中为空

from_json returns null in Apache Spark Kafka readStream

我正在尝试阅读 kafka 主题并使用 pySpark 在控制台上显示数据。我已经定义了 from_json 架构并尝试匹配和显示它。但是,df returns nulls.

kafka topic 和 schema 中的原始对象是 blow。

{
  "kind": "youtube#videoListResponse",
  "etag": "jUow4VqgbKTDD9d1QI8TBQdM0po",
  "items": [
    {
      "kind": "youtube#video",
      "etag": "SRkYji_KdZvK3LDoACVdkHcm-Og",
      "id": "E4R_WJBqaaQ",
      "snippet": {
        "publishedAt": "2022-03-30T13:51:05Z",
        "channelId": "UCw9DyZg3_F0bIks2jrEgQAA",
        "title": "Brother Job #sadiqahmed",
        "description": "",
        "thumbnails": {
          "default": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/default.jpg",
            "width": 120,
            "height": 90
          },
          "medium": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/mqdefault.jpg",
            "width": 320,
            "height": 180
          },
          "high": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/hqdefault.jpg",
            "width": 480,
            "height": 360
          },
          "standard": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/sddefault.jpg",
            "width": 640,
            "height": 480
          },
          "maxres": {
            "url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/maxresdefault.jpg",
            "width": 1280,
            "height": 720
          }
        },
        "channelTitle": "Sadiq Ahmed",
        "categoryId": "22",
        "liveBroadcastContent": "none",
        "localized": {
          "title": "Brother Job #sadiqahmed",
          "description": ""
        }
      },
      "statistics": {
        "viewCount": "5155911",
        "likeCount": "559596",
        "favoriteCount": "0",
        "commentCount": "844"
      }
    }
  ],
  "nextPageToken": "CAEQAA",
  "pageInfo": {
    "totalResults": 200,
    "resultsPerPage": 1
  }
}

json 模式定义

mySchema = StructType([
    StructField("items",ArrayType(StructType([
                        StructField("id", StringType, True),
                        StructField("snippet", StructType([
                                StructField("channelId", StringType, True),
                                StructField("channelTitle", StringType, True),
                                StructField("categoryId", IntegerType, True),
                                StructField("publishedAt", TimestampType, True),
                                StructField("title", StringType, True)]), 
                            True),
                        StructField("statistics", StructType([
                                StructField("commentCount", IntegerType, True),
                                StructField("favoriteCount", IntegerType, True),
                                StructField("likeCount", IntegerType, True),
                                StructField("viewCount", IntegerType, True)]),
                            True)]), True), True)                    
])

读取Kafka Topic并转换为from_json

df = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", kafkaServer)\
    .option("subscribePattern", topic_name_read)\
    .option("startingOffsets", "earliest")\
    .load()\
    .selectExpr("CAST(value AS STRING)")\
    .select(F.from_json(F.col("value").cast("string"), mySchema).alias("data"))\
    .select("data.*")

当前输出。

+-----+
|items|
+-----+
|null |
+-----+

我的模式定义有什么问题。提前谢谢你。

最常见的情况是,它为空,因为字段不匹配。我相信您需要定义每个字段,因此您应该尝试添加 kindetag 以及您遗漏的任何其他内容。

如果您真的只需要字符串中的某些字段,您也可以使用 get_json_object 而不是定义架构。