使用 pySpark 在 Azure Databricks 中使用来自 EventHub 的事件

Consume events from EventHub In Azure Databricks using pySpark

我可以看到在 Azure Databricks 中使用 Scala 从事件中心消费事件的 spark 连接器和指南。

但是,我们如何使用 pySpark 从 azure databricks 中使用事件中心中的事件?

任何suggestions/documentation细节都会有所帮助。谢谢

下面是从 pyspark on azure data-bricks 的事件中心读取事件的代码段 data-bricks。

// With an entity path 
val with = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME"


# Source with default settings
connectionString = "Valid EventHubs connection string."
ehConf = {
  'eventhubs.connectionString' : connectionString
}

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)

我认为如果您使用 spark 版本 2.4.5 或更高版本以及 Azure 事件中心连接器版本 2.3,则需要稍作修改。 15或以上

对于2.3.15版本及以上版本,配置字典要求对连接字符串进行加密,因此您需要按照下面的代码片段进行传递。

connectionString = "Endpoint=sb://SAMPLE;SharedAccessKeyName=KEY_NAME;SharedAccessKey=KEY;EntityPath=EVENTHUB_NAME"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

readInStreamBody = df.withColumn("body", df["body"].cast("string"))
display(readInStreamBody)