如何将 kafka 消息值转换为特定模式?
How to convert kafka message value to a particular schema?
我正在尝试使用 Pyspark 从 Kafka 主题中读取数据。我想将该数据转换为特定模式。但是做不到。
这是我尝试过的:
>> df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()
>> userSchema = StructType().add("Name", StringType(), True).add("Age", IntegerType(), True)
>> df1 = df.selectExpr("CAST(value AS STRING)")
>> df2 = df1.select(from_json(col("value"), userSchema))
>> df2.printSchema()
root
|-- jsontostructs(value): struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- Age: integer (nullable = true)
我想要的是:
>> df2.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
有什么方法可以得到想要的模式吗?
对于面临相同问题的任何人,这是我如何实现的:
df2 = df1.select(from_json(col("value"),userSchema)).select("jsontostructs(value).*")
我正在尝试使用 Pyspark 从 Kafka 主题中读取数据。我想将该数据转换为特定模式。但是做不到。
这是我尝试过的:
>> df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()
>> userSchema = StructType().add("Name", StringType(), True).add("Age", IntegerType(), True)
>> df1 = df.selectExpr("CAST(value AS STRING)")
>> df2 = df1.select(from_json(col("value"), userSchema))
>> df2.printSchema()
root
|-- jsontostructs(value): struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- Age: integer (nullable = true)
我想要的是:
>> df2.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
有什么方法可以得到想要的模式吗?
对于面临相同问题的任何人,这是我如何实现的:
df2 = df1.select(from_json(col("value"),userSchema)).select("jsontostructs(value).*")