将 XML 文件内容发送到事件中心并从 Databricks 读取它
Sending XML file content to Event Hub and read it from Databricks
我正在尝试将 xml 个文件(小于 100 kb)发送到 Azure 事件中心,然后在发送后读取 Databricks 中的事件。
现在我已经使用Python SDK 以字节为单位发送了XML 的内容(这一步WORKS)。但下一步我想实现的是从事件的“主体”读取 XML 内容并使用 PYSPARK.
创建一个 Spark Dataframe
能做到这一点,我有两个疑惑:
1- Is there any option where I specify in the spark.readStream
option that the content of the "body" of the event is an XML?
2- Is there any alternative to dump that content directly to a Spark Dataframe?
3- I'm missing some configuration when sending the XML as events?
我试过下面的例子:
Python 事件制作者
# this is the python event hub message producer
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
import xml.etree.ElementTree as ET
from lxml import etree
from pathlib import Path
connection_str= "Endpoint_str"
eventhub_name = "eventhub_name"
xml_path = Path("path/to/xmlfile.xml")
xml_data = ET.parse(xml_path)
tree = xml_data.getroot()
data = ET.tostring(tree)
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str=connection_str, eventhub_name=eventhub_name)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData(data))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
事件reader
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
谢谢!!!
所以我终于想到了下一个从事件中心正文读取 XML 的方法。
首先我使用import xml.etree.ElementTree as ET
库来解析XML结构。
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load() \
.select("body")
df = stream_data.withColumn("body", stream_data["body"].cast("string"))
import xml.etree.ElementTree as ET
import json
def returnV(col):
elem_dict= {}
tag_list = [
'./TAG/Document/id',
'./TAG/Document/car',
'./TAG/Document/motor',
'./Metadata/Date']
root = ET.fromstring(col)
for tag in tag_list:
for item in root.findall(tag):
elem_dict[item.tag] = item.text
return json.dumps(elem_dict)
我有一些嵌套的标签,通过这种方法我提取了所有需要的值并将它们作为 JSON 返回。我所了解到的是,如果传入模式可以更改,则结构化流不是解决方案。所以我只采用了那些我知道它们不会随时间改变的值。
然后,一旦定义了方法,我将其注册为 UDF。
extractValuesFromXML = udf(returnV)
XML_DF= df.withColumn("body",extractValuesFromXML("body"))
然后最后我只使用 get_json_object
函数来提取 JSON
的值
input_parsed_df = XML_DF.select(
get_json_object("body", "$.id").alias("id").cast('integer'),
get_json_object("body", "$.car").alias("car"),
get_json_object("body", "$.motor").alias("motor"),
get_json_object("body", "$.Date").alias("Date")
)
我正在尝试将 xml 个文件(小于 100 kb)发送到 Azure 事件中心,然后在发送后读取 Databricks 中的事件。
现在我已经使用Python SDK 以字节为单位发送了XML 的内容(这一步WORKS)。但下一步我想实现的是从事件的“主体”读取 XML 内容并使用 PYSPARK.
创建一个 Spark Dataframe能做到这一点,我有两个疑惑:
1- Is there any option where I specify in the
spark.readStream
option that the content of the "body" of the event is an XML?
2- Is there any alternative to dump that content directly to a Spark Dataframe?
3- I'm missing some configuration when sending the XML as events?
我试过下面的例子:
Python 事件制作者
# this is the python event hub message producer
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
import xml.etree.ElementTree as ET
from lxml import etree
from pathlib import Path
connection_str= "Endpoint_str"
eventhub_name = "eventhub_name"
xml_path = Path("path/to/xmlfile.xml")
xml_data = ET.parse(xml_path)
tree = xml_data.getroot()
data = ET.tostring(tree)
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str=connection_str, eventhub_name=eventhub_name)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData(data))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
事件reader
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
谢谢!!!
所以我终于想到了下一个从事件中心正文读取 XML 的方法。
首先我使用import xml.etree.ElementTree as ET
库来解析XML结构。
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load() \
.select("body")
df = stream_data.withColumn("body", stream_data["body"].cast("string"))
import xml.etree.ElementTree as ET
import json
def returnV(col):
elem_dict= {}
tag_list = [
'./TAG/Document/id',
'./TAG/Document/car',
'./TAG/Document/motor',
'./Metadata/Date']
root = ET.fromstring(col)
for tag in tag_list:
for item in root.findall(tag):
elem_dict[item.tag] = item.text
return json.dumps(elem_dict)
我有一些嵌套的标签,通过这种方法我提取了所有需要的值并将它们作为 JSON 返回。我所了解到的是,如果传入模式可以更改,则结构化流不是解决方案。所以我只采用了那些我知道它们不会随时间改变的值。
然后,一旦定义了方法,我将其注册为 UDF。
extractValuesFromXML = udf(returnV)
XML_DF= df.withColumn("body",extractValuesFromXML("body"))
然后最后我只使用 get_json_object
函数来提取 JSON
input_parsed_df = XML_DF.select(
get_json_object("body", "$.id").alias("id").cast('integer'),
get_json_object("body", "$.car").alias("car"),
get_json_object("body", "$.motor").alias("motor"),
get_json_object("body", "$.Date").alias("Date")
)