serialise/deserialise 条消息在 EventStore/EventHub 中

serialise/deserialise messages in EventStore/EventHub

如果 events/messages 的生产者和消费者都是基于 .Net/C# 的,我倾向于在有效负载中使用元数据,以便能够将数据反序列化为 C# POCO,如下所示:

Data
{
  "X": {
    "a": "bb811ea5-6993-e511-80fc-1458d043a750",
    "b": "ddd",
    "b": "dddd",
    "d": true
  }
  "x1": 1.1234,
  "x2": 2.3456,
  "EventUtcDateTime": "2016-02-16T08:55:38.5103574Z"
}

Metadata
{
  "TimeStamp": "02/16/2016 08:55:37",
  "EventClrTypeName": "Bla.Di.Bla.SomeClass, Bla.Di.Bla, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"
}

在生产者不是 .Net/C# 的情况下,什么是好的解决方案?

对我来说,合乎逻辑的答案是在不同上下文共享的任何 JSON 事件中添加强制性 EventType。

因此 EventType 应该是数据而不是元数据的强制性部分。

EventData class contains a property Properties... 允许您向邮件添加元数据:

Gets the user properties of the event data that the user explicitly added during send operations.

所以发送事件:

var eventHubClient = EventHubClient.CreateFromConnectionString("connectionString", "eventHubName");
var mypoco = new POCO();
// ...

// Get the Json string
var stringBody = JsonConvert.SerializeObject(mypoco);

// Create the event data
var eventData = new EventData(Encoding.UTF8.GetBytes(stringBody));

// Add the event type.
eventData.Properties.Add("EventType", typeof(POCO).Assembly.FullName);

// Send the data.
eventHubClient.Send(eventData);

收到消息时,您将从消息的元数据中获取事件类型:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (EventData eventData in messages)
    {
        var jsonBody = Encoding.UTF8.GetString(eventData.GetBytes());

        //Get the event type
        var eventTypeName = (string)eventData.Properties["EventType"];
        var eventType = Type.GetType(eventTypeName);

        // Deserialize the object
        var myPoco = JsonConvert.DeserializeObject(jsonBody, eventType);
    }
}

否则,您可以使用 JObject

摆脱体型
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (EventData eventData in messages)
    {
        var jsonBody = Encoding.UTF8.GetString(eventData.GetBytes());

        // Deserialize the json as a JObject
        var myPoco = JObject.Parse(jsonBody);
        var a = myPoco["X"]["a"];

    }
}