如何将事件从文件发送到事件中心
How to send events from file to event hub
我是 EventHub 的新手,在我的项目中使用 Python 脚本将事件一个一个地发送到 Eventhub。是否可以将所有事件保存在一个文件中并将它们一起发送到 EventHub?
我想要实现的是:每秒向 EventHub 发送数千个事件。所以我可以将所有的 events/messages 保存在一个文件中,然后一次性发送。
请提出实现此目标的方法。
这是您可以用来批量发送事件的代码
#!/usr/bin/env python
"""
An example to show batch sending events to an Event Hub.
"""
# pylint: disable=C0111
import sys
import logging
import datetime
import time
import os
from azure.eventhub import EventHubClient, Sender, EventData
import examples
logger = examples.get_logger(logging.INFO)
# Address can be in either of these formats:
# "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<mynamespace>.servicebus.windows.net/myeventhub"
# "amqps://<mynamespace>.servicebus.windows.net/myeventhub"
ADDRESS = os.environ.get('EVENT_HUB_ADDRESS')
# SAS policy and key are not required if they are encoded in the URL
USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')
def data_generator():
for i in range(1500):
logger.info("Yielding message {}".format(i))
yield b"Hello world"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="1")
client.run()
try:
start_time = time.time()
data = EventData(batch=data_generator())
sender.send(data)
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
希望对您有所帮助。
对于最新版本(5.2.0)的库(GitHub, Reference docs,您可以批量发送事件并像这样设置正文和属性:
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient, EventData
import json
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
content = json.dumps({"LocationId": "123", "userId": "123"})
event_data = EventData(body = content) # body can be of type `str` or `bytes`
event_data.properties = {"Type": "iPhone"}
event_data_batch.add(event_data)
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
with client:
client.send_batch(event_data_batch)
消费事件:
consumer_group = "$Default"
client = EventHubConsumerClient.from_connection_string(
connection_str, consumer_group, eventhub_name=eventhub_name
)
def on_event_batch(partition_context, events):
partition_context.update_checkpoint()
for e in events:
print(e.body_as_str())
print("properties={}".format(e.properties))
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
我是 EventHub 的新手,在我的项目中使用 Python 脚本将事件一个一个地发送到 Eventhub。是否可以将所有事件保存在一个文件中并将它们一起发送到 EventHub?
我想要实现的是:每秒向 EventHub 发送数千个事件。所以我可以将所有的 events/messages 保存在一个文件中,然后一次性发送。
请提出实现此目标的方法。
这是您可以用来批量发送事件的代码
#!/usr/bin/env python
"""
An example to show batch sending events to an Event Hub.
"""
# pylint: disable=C0111
import sys
import logging
import datetime
import time
import os
from azure.eventhub import EventHubClient, Sender, EventData
import examples
logger = examples.get_logger(logging.INFO)
# Address can be in either of these formats:
# "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<mynamespace>.servicebus.windows.net/myeventhub"
# "amqps://<mynamespace>.servicebus.windows.net/myeventhub"
ADDRESS = os.environ.get('EVENT_HUB_ADDRESS')
# SAS policy and key are not required if they are encoded in the URL
USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')
def data_generator():
for i in range(1500):
logger.info("Yielding message {}".format(i))
yield b"Hello world"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="1")
client.run()
try:
start_time = time.time()
data = EventData(batch=data_generator())
sender.send(data)
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
希望对您有所帮助。
对于最新版本(5.2.0)的库(GitHub, Reference docs,您可以批量发送事件并像这样设置正文和属性:
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient, EventData
import json
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
content = json.dumps({"LocationId": "123", "userId": "123"})
event_data = EventData(body = content) # body can be of type `str` or `bytes`
event_data.properties = {"Type": "iPhone"}
event_data_batch.add(event_data)
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
with client:
client.send_batch(event_data_batch)
消费事件:
consumer_group = "$Default"
client = EventHubConsumerClient.from_connection_string(
connection_str, consumer_group, eventhub_name=eventhub_name
)
def on_event_batch(partition_context, events):
partition_context.update_checkpoint()
for e in events:
print(e.body_as_str())
print("properties={}".format(e.properties))
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')