将两个分区流合并为一个流
Combining two partitioned streams into one stream
我是 Azure 事件中心的新手。使用事件中心,我们从 IoT 设备接收数据,并通过分配分区号“0”和“1”将数据分成两个流。
我们需要两个流的原因是,一个用于训练 "deep learning model",另一个用于测试我们使用来自另一侧的新数据训练的模型。
这叫做"online learning"。
但是,在我们还没有训练模型的情况下,我们无法使用该模型对其进行测试,因此在这种情况下,我宁愿将两个分区的流合并为一个,而不是在这种情况下使用两个流这样就不会浪费数据。稍后一旦创建了模型,我们就可以同时返回两个流来进行测试和训练。
我找不到任何可以在事件中心脚本中组合它们的模块。有什么建议么?
如果您可以在发送到事件中心的过程中向数据添加属性,那么您可以尝试以下步骤。
1.We需要为每个事件数据设置2个属性。
对于测试数据,我们可以添加以下2个属性:
property_name:"category",它的值:"test",用来判断你接收的是哪种数据,比如考试或火车。
property_name:"seqNum",它的值为number,如0,1,2,3,用来判断顺序数据.
对于训练数据,使用上述步骤,只需将类别值更改为“训练”。
我在 C# 代码中设置了这些属性,如下所示。您可以通过自己的方式设置它而无需 c#:
for (var i = 0; i < numMessagesToSend; i++)
{
var message = "555 Message";
EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));
//add properties
mydata.Properties.Add("seqNum", i);
mydata.Properties.Add("category", "test");
await eventHubClient.SendAsync(mydata);
}
然后使用下面的python代码接收数据。在这里,我定义了 2 个字典,一个用于存储测试数据,另一个用于存储训练数据。
import logging
import asyncio
import os
import sys
import signal
import functools
from azure.eventprocessorhost import (
AbstractEventProcessor,
AzureStorageCheckpointLeaseManager,
EventHubConfig,
EventProcessorHost,
EPHOptions
)
# define 2 dictionaries, to store test data and train data respectively.
dict_test={}
dict_train={}
class EventProcessor(AbstractEventProcessor):
def __init__(self, params=None):
super().__init__(params)
self._msg_counter = 0
async def open_async(self, context):
print("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason,
context.partition_id,
context.offset,
context.sequence_number))
async def process_events_async(self, context, messages):
for m in messages:
data = m.body_as_str()
if m.application_properties is not None:
mycategory = m.application_properties.get(b'category').decode('utf-8')
mysequence = str(m.application_properties.get(b'seqNum'))
if mycategory == 'test':
dict_test[mysequence]=data
if mycategory == 'train':
dict_train[mysequence]=data
print("Received data: {}".format(data))
await context.checkpoint_async()
async def process_error_async(self, context, error):
print("Event Processor Error {!r}".format(error))
async def wait_and_close(host):
await asyncio.sleep(60)
await host.close_async()
try:
loop = asyncio.get_event_loop()
# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxxx"
# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(
STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
# Event loop and host
host = EventProcessorHost(
EventProcessor,
eh_config,
storage_manager,
ep_params=["param1","param2"],
eph_options=eh_options,
loop=loop)
tasks = asyncio.gather(
host.open_async(),
wait_and_close(host))
loop.run_until_complete(tasks)
print("***this is the data for test***")
print(dict_test)
print("***-----------------------***")
print("***this is the data for train***")
print(dict_train)
except KeyboardInterrupt:
# Canceling pending tasks and stopping the loop
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_forever()
tasks.exception()
finally:
loop.stop()
测试结果如下:
最后一步,由于测试数据/训练数据分别存放在字典中,而字典的key是序号,可以自己写代码操作字典,在里面重建测试数据/训练数据顺序。
我是 Azure 事件中心的新手。使用事件中心,我们从 IoT 设备接收数据,并通过分配分区号“0”和“1”将数据分成两个流。
我们需要两个流的原因是,一个用于训练 "deep learning model",另一个用于测试我们使用来自另一侧的新数据训练的模型。
这叫做"online learning"。
但是,在我们还没有训练模型的情况下,我们无法使用该模型对其进行测试,因此在这种情况下,我宁愿将两个分区的流合并为一个,而不是在这种情况下使用两个流这样就不会浪费数据。稍后一旦创建了模型,我们就可以同时返回两个流来进行测试和训练。
我找不到任何可以在事件中心脚本中组合它们的模块。有什么建议么?
如果您可以在发送到事件中心的过程中向数据添加属性,那么您可以尝试以下步骤。
1.We需要为每个事件数据设置2个属性。
对于测试数据,我们可以添加以下2个属性:
property_name:"category",它的值:"test",用来判断你接收的是哪种数据,比如考试或火车。
property_name:"seqNum",它的值为number,如0,1,2,3,用来判断顺序数据.
对于训练数据,使用上述步骤,只需将类别值更改为“训练”。
我在 C# 代码中设置了这些属性,如下所示。您可以通过自己的方式设置它而无需 c#:
for (var i = 0; i < numMessagesToSend; i++)
{
var message = "555 Message";
EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));
//add properties
mydata.Properties.Add("seqNum", i);
mydata.Properties.Add("category", "test");
await eventHubClient.SendAsync(mydata);
}
然后使用下面的python代码接收数据。在这里,我定义了 2 个字典,一个用于存储测试数据,另一个用于存储训练数据。
import logging
import asyncio
import os
import sys
import signal
import functools
from azure.eventprocessorhost import (
AbstractEventProcessor,
AzureStorageCheckpointLeaseManager,
EventHubConfig,
EventProcessorHost,
EPHOptions
)
# define 2 dictionaries, to store test data and train data respectively.
dict_test={}
dict_train={}
class EventProcessor(AbstractEventProcessor):
def __init__(self, params=None):
super().__init__(params)
self._msg_counter = 0
async def open_async(self, context):
print("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason,
context.partition_id,
context.offset,
context.sequence_number))
async def process_events_async(self, context, messages):
for m in messages:
data = m.body_as_str()
if m.application_properties is not None:
mycategory = m.application_properties.get(b'category').decode('utf-8')
mysequence = str(m.application_properties.get(b'seqNum'))
if mycategory == 'test':
dict_test[mysequence]=data
if mycategory == 'train':
dict_train[mysequence]=data
print("Received data: {}".format(data))
await context.checkpoint_async()
async def process_error_async(self, context, error):
print("Event Processor Error {!r}".format(error))
async def wait_and_close(host):
await asyncio.sleep(60)
await host.close_async()
try:
loop = asyncio.get_event_loop()
# Storage Account Credentials
STORAGE_ACCOUNT_NAME = "xxx"
STORAGE_KEY = "xxxx"
LEASE_CONTAINER_NAME = "xxx"
NAMESPACE = "xxx"
EVENTHUB = "xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxxx"
# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(
STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
# Event loop and host
host = EventProcessorHost(
EventProcessor,
eh_config,
storage_manager,
ep_params=["param1","param2"],
eph_options=eh_options,
loop=loop)
tasks = asyncio.gather(
host.open_async(),
wait_and_close(host))
loop.run_until_complete(tasks)
print("***this is the data for test***")
print(dict_test)
print("***-----------------------***")
print("***this is the data for train***")
print(dict_train)
except KeyboardInterrupt:
# Canceling pending tasks and stopping the loop
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_forever()
tasks.exception()
finally:
loop.stop()
测试结果如下:
最后一步,由于测试数据/训练数据分别存放在字典中,而字典的key是序号,可以自己写代码操作字典,在里面重建测试数据/训练数据顺序。