from_json returns Apache Spark Kafka readStream 中为空
from_json returns null in Apache Spark Kafka readStream
我正在尝试阅读 kafka 主题并使用 pySpark 在控制台上显示数据。我已经定义了 from_json 架构并尝试匹配和显示它。但是,df returns nulls.
kafka topic 和 schema 中的原始对象是 blow。
{
"kind": "youtube#videoListResponse",
"etag": "jUow4VqgbKTDD9d1QI8TBQdM0po",
"items": [
{
"kind": "youtube#video",
"etag": "SRkYji_KdZvK3LDoACVdkHcm-Og",
"id": "E4R_WJBqaaQ",
"snippet": {
"publishedAt": "2022-03-30T13:51:05Z",
"channelId": "UCw9DyZg3_F0bIks2jrEgQAA",
"title": "Brother Job #sadiqahmed",
"description": "",
"thumbnails": {
"default": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/default.jpg",
"width": 120,
"height": 90
},
"medium": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/mqdefault.jpg",
"width": 320,
"height": 180
},
"high": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/hqdefault.jpg",
"width": 480,
"height": 360
},
"standard": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/sddefault.jpg",
"width": 640,
"height": 480
},
"maxres": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/maxresdefault.jpg",
"width": 1280,
"height": 720
}
},
"channelTitle": "Sadiq Ahmed",
"categoryId": "22",
"liveBroadcastContent": "none",
"localized": {
"title": "Brother Job #sadiqahmed",
"description": ""
}
},
"statistics": {
"viewCount": "5155911",
"likeCount": "559596",
"favoriteCount": "0",
"commentCount": "844"
}
}
],
"nextPageToken": "CAEQAA",
"pageInfo": {
"totalResults": 200,
"resultsPerPage": 1
}
}
json 模式定义
mySchema = StructType([
StructField("items",ArrayType(StructType([
StructField("id", StringType, True),
StructField("snippet", StructType([
StructField("channelId", StringType, True),
StructField("channelTitle", StringType, True),
StructField("categoryId", IntegerType, True),
StructField("publishedAt", TimestampType, True),
StructField("title", StringType, True)]),
True),
StructField("statistics", StructType([
StructField("commentCount", IntegerType, True),
StructField("favoriteCount", IntegerType, True),
StructField("likeCount", IntegerType, True),
StructField("viewCount", IntegerType, True)]),
True)]), True), True)
])
读取Kafka Topic并转换为from_json
df = spark.read.format("kafka")\
.option("kafka.bootstrap.servers", kafkaServer)\
.option("subscribePattern", topic_name_read)\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value").cast("string"), mySchema).alias("data"))\
.select("data.*")
当前输出。
+-----+
|items|
+-----+
|null |
+-----+
我的模式定义有什么问题。提前谢谢你。
最常见的情况是,它为空,因为字段不匹配。我相信您需要定义每个字段,因此您应该尝试添加 kind
、etag
以及您遗漏的任何其他内容。
如果您真的只需要字符串中的某些字段,您也可以使用 get_json_object
而不是定义架构。
我正在尝试阅读 kafka 主题并使用 pySpark 在控制台上显示数据。我已经定义了 from_json 架构并尝试匹配和显示它。但是,df returns nulls.
kafka topic 和 schema 中的原始对象是 blow。
{
"kind": "youtube#videoListResponse",
"etag": "jUow4VqgbKTDD9d1QI8TBQdM0po",
"items": [
{
"kind": "youtube#video",
"etag": "SRkYji_KdZvK3LDoACVdkHcm-Og",
"id": "E4R_WJBqaaQ",
"snippet": {
"publishedAt": "2022-03-30T13:51:05Z",
"channelId": "UCw9DyZg3_F0bIks2jrEgQAA",
"title": "Brother Job #sadiqahmed",
"description": "",
"thumbnails": {
"default": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/default.jpg",
"width": 120,
"height": 90
},
"medium": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/mqdefault.jpg",
"width": 320,
"height": 180
},
"high": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/hqdefault.jpg",
"width": 480,
"height": 360
},
"standard": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/sddefault.jpg",
"width": 640,
"height": 480
},
"maxres": {
"url": "https://i.ytimg.com/vi/E4R_WJBqaaQ/maxresdefault.jpg",
"width": 1280,
"height": 720
}
},
"channelTitle": "Sadiq Ahmed",
"categoryId": "22",
"liveBroadcastContent": "none",
"localized": {
"title": "Brother Job #sadiqahmed",
"description": ""
}
},
"statistics": {
"viewCount": "5155911",
"likeCount": "559596",
"favoriteCount": "0",
"commentCount": "844"
}
}
],
"nextPageToken": "CAEQAA",
"pageInfo": {
"totalResults": 200,
"resultsPerPage": 1
}
}
json 模式定义
mySchema = StructType([
StructField("items",ArrayType(StructType([
StructField("id", StringType, True),
StructField("snippet", StructType([
StructField("channelId", StringType, True),
StructField("channelTitle", StringType, True),
StructField("categoryId", IntegerType, True),
StructField("publishedAt", TimestampType, True),
StructField("title", StringType, True)]),
True),
StructField("statistics", StructType([
StructField("commentCount", IntegerType, True),
StructField("favoriteCount", IntegerType, True),
StructField("likeCount", IntegerType, True),
StructField("viewCount", IntegerType, True)]),
True)]), True), True)
])
读取Kafka Topic并转换为from_json
df = spark.read.format("kafka")\
.option("kafka.bootstrap.servers", kafkaServer)\
.option("subscribePattern", topic_name_read)\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value").cast("string"), mySchema).alias("data"))\
.select("data.*")
当前输出。
+-----+
|items|
+-----+
|null |
+-----+
我的模式定义有什么问题。提前谢谢你。
最常见的情况是,它为空,因为字段不匹配。我相信您需要定义每个字段,因此您应该尝试添加 kind
、etag
以及您遗漏的任何其他内容。
如果您真的只需要字符串中的某些字段,您也可以使用 get_json_object
而不是定义架构。