如何从 ADLS 将自定义数据帧写入 eventhub
How to write custom dataframe to eventhub from ADLS
我想将自定义数据框写入 eventhub。
val customDf = spark.read.json("path/to/json")
EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")
val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()
eventhubSchema.printSchema
将显示 eventhub 正文的默认架构
现在我要把上面的customDf写到eventhub
Method1:
ds = customDf \
.selectExpr("partitionKey", "body") \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
方法二:
ds = customDf \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
如何将 customDf 写入 eventhub。我什至做了 select(get_json_object(cast to striong type) 但我得到的是
org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns
如何将customDf写入eventhub
您需要将数据框中的数据转换为单个列对象(二进制或字符串),这实际上取决于您的消费者。最简单的方法是将所有数据打包为 JSON,使用 to_json
+ struct
函数的组合:
import pyspark.sql.functions as F
stream = customDf \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "...") \
.start()
我想将自定义数据框写入 eventhub。
val customDf = spark.read.json("path/to/json")
EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")
val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()
eventhubSchema.printSchema
将显示 eventhub 正文的默认架构
现在我要把上面的customDf写到eventhub
Method1:
ds = customDf \
.selectExpr("partitionKey", "body") \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
方法二:
ds = customDf \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
如何将 customDf 写入 eventhub。我什至做了 select(get_json_object(cast to striong type) 但我得到的是
org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns
如何将customDf写入eventhub
您需要将数据框中的数据转换为单个列对象(二进制或字符串),这实际上取决于您的消费者。最简单的方法是将所有数据打包为 JSON,使用 to_json
+ struct
函数的组合:
import pyspark.sql.functions as F
stream = customDf \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "...") \
.start()