从 EventProcessorHost 获取数据
Getting your data out of an EventProcessorHost
我对使用 EventProcessorHost 和 IEventProcessor 还很陌生,我想弄清楚如何从 EventProcessorClass 中获取数据。如果我只想将新消息记录到控制台,我目前已准备就绪并可以正常工作。
我当前的实现(我什至不确定它是否可以接受或什至是好的做法)创建一个静态变量,然后在数据进入时将其存储在其中,以便另一个处理器可以收集它。这样可以吗?是否有更好更简洁的方式来访问数据?
这是我目前所拥有的(锁定机制非常基本,当我让其余代码正常工作时将被修复):
internal class Receiver
{
public static List<string> incommingMessagesList = new List<string>();
public static bool fIsDataListLocked = false;
private EventProcessorHost m_EPHClient;
...
Console.WriteLine( "Registering EventProcessor..." );
await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
}
public class SimpleEventProcessor : IEventProcessor
{
...
public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
{
foreach( var eventData in messages )
{
while( !Receiver.fIsDataListLocked )
{
Receiver.fIsDataListLocked = true ;
Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
Receiver.fIsDataListLocked = false ;
}
}
return context.CheckpointAsync();
}
}
更新:
根据要求提供更多信息:
基本上我从管道的两个不同端提取数据以验证所有消息通过并跟踪它们的吞吐量,一端是 eventhub,但另一端作为 HTTP 请求来自 lwm2m 服务器。所以我有一个控制器进程 运行 需要从两端获取数据以便 clean/analyze 数据。就像我说的,我是事件处理器的新手,但让 EventProcessorHost 处理收集两组数据然后 cleaning/analyzing 对我来说没有意义。我绝对可以改变以这种方式做事,但它看起来很笨重。
在典型情况下,事件处理器以最快的方式接收和保存数据。多个事件处理器实例将从不同的 EventHub 分区读取数据。
在您的情况下,您希望将数据发送到其他地方,并在那里结合该数据的另一个流进行处理。像 List 这样的内存集合可能不是执行此操作的最佳方法:
- 它需要是线程安全的
- 崩溃时数据会丢失
- 您将需要手动删除已处理的数据,以防止集合不断增长
您将需要某种生产者/消费者实施。
一种可能的解决方案是将两个数据流都写入一个目标,例如 Azure 存储队列。这样做的主要优点是,当发生故障时,所有数据仍然存在并且不会丢失。您的最终处理器可以以自己的速度从队列中读取。
我对使用 EventProcessorHost 和 IEventProcessor 还很陌生,我想弄清楚如何从 EventProcessorClass 中获取数据。如果我只想将新消息记录到控制台,我目前已准备就绪并可以正常工作。
我当前的实现(我什至不确定它是否可以接受或什至是好的做法)创建一个静态变量,然后在数据进入时将其存储在其中,以便另一个处理器可以收集它。这样可以吗?是否有更好更简洁的方式来访问数据?
这是我目前所拥有的(锁定机制非常基本,当我让其余代码正常工作时将被修复):
internal class Receiver
{
public static List<string> incommingMessagesList = new List<string>();
public static bool fIsDataListLocked = false;
private EventProcessorHost m_EPHClient;
...
Console.WriteLine( "Registering EventProcessor..." );
await m_EPHClient.RegisterEventProcessorAsync<SimpleEventProcessor>();
}
public class SimpleEventProcessor : IEventProcessor
{
...
public Task ProcessEventsAsync( PartitionContext context, IEnumerable<EventData> messages )
{
foreach( var eventData in messages )
{
while( !Receiver.fIsDataListLocked )
{
Receiver.fIsDataListLocked = true ;
Receiver.incommingMessagesList.Add( Encoding.UTF8.GetString( eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count ) );
Receiver.fIsDataListLocked = false ;
}
}
return context.CheckpointAsync();
}
}
更新:
根据要求提供更多信息:
基本上我从管道的两个不同端提取数据以验证所有消息通过并跟踪它们的吞吐量,一端是 eventhub,但另一端作为 HTTP 请求来自 lwm2m 服务器。所以我有一个控制器进程 运行 需要从两端获取数据以便 clean/analyze 数据。就像我说的,我是事件处理器的新手,但让 EventProcessorHost 处理收集两组数据然后 cleaning/analyzing 对我来说没有意义。我绝对可以改变以这种方式做事,但它看起来很笨重。
在典型情况下,事件处理器以最快的方式接收和保存数据。多个事件处理器实例将从不同的 EventHub 分区读取数据。
在您的情况下,您希望将数据发送到其他地方,并在那里结合该数据的另一个流进行处理。像 List 这样的内存集合可能不是执行此操作的最佳方法:
- 它需要是线程安全的
- 崩溃时数据会丢失
- 您将需要手动删除已处理的数据,以防止集合不断增长
您将需要某种生产者/消费者实施。
一种可能的解决方案是将两个数据流都写入一个目标,例如 Azure 存储队列。这样做的主要优点是,当发生故障时,所有数据仍然存在并且不会丢失。您的最终处理器可以以自己的速度从队列中读取。