使用 pySpark 在 Azure Databricks 中使用来自 EventHub 的事件
Consume events from EventHub In Azure Databricks using pySpark
我可以看到在 Azure Databricks 中使用 Scala 从事件中心消费事件的 spark 连接器和指南。
但是,我们如何使用 pySpark 从 azure databricks 中使用事件中心中的事件?
任何suggestions/documentation细节都会有所帮助。谢谢
下面是从 pyspark on azure data-bricks 的事件中心读取事件的代码段 data-bricks。
// With an entity path
val with = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME"
# Source with default settings
connectionString = "Valid EventHubs connection string."
ehConf = {
'eventhubs.connectionString' : connectionString
}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)
我认为如果您使用 spark 版本 2.4.5 或更高版本以及 Azure 事件中心连接器版本 2.3,则需要稍作修改。 15或以上
对于2.3.15版本及以上版本,配置字典要求对连接字符串进行加密,因此您需要按照下面的代码片段进行传递。
connectionString = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)
我可以看到在 Azure Databricks 中使用 Scala 从事件中心消费事件的 spark 连接器和指南。
但是,我们如何使用 pySpark 从 azure databricks 中使用事件中心中的事件?
任何suggestions/documentation细节都会有所帮助。谢谢
下面是从 pyspark on azure data-bricks 的事件中心读取事件的代码段 data-bricks。
// With an entity path
val with = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME"
# Source with default settings
connectionString = "Valid EventHubs connection string."
ehConf = {
'eventhubs.connectionString' : connectionString
}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)
我认为如果您使用 spark 版本 2.4.5 或更高版本以及 Azure 事件中心连接器版本 2.3,则需要稍作修改。 15或以上
对于2.3.15版本及以上版本,配置字典要求对连接字符串进行加密,因此您需要按照下面的代码片段进行传递。
connectionString = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)