使用 C# 反序列化 Avro 文件
Deserialize an Avro file with C#
我找不到使用 C# 反序列化 Apache Avro 文件的方法。 Avro 文件是由 Microsoft Azure 事件中心 Archive feature 生成的文件。
使用 Java 我可以使用 Apache 中的 Avro Tools 将文件转换为 JSON:
java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
使用 NuGet 包 Microsoft.Hadoop.Avro 我能够提取 SequenceNumber
、Offset
和 EnqueuedTimeUtc
,但是因为我不'知道使用什么类型 Body
抛出异常。我试过 Dictionary<string, object>
和其他类型。
static void Main(string[] args)
{
var fileName = "...";
using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (var reader = AvroContainer.CreateReader<EventData>(stream))
{
using (var streamReader = new SequentialReader<EventData>(reader))
{
var record = streamReader.Objects.FirstOrDefault();
}
}
}
}
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
public foo Body { get; set; }
// More properties...
}
架构如下所示:
{
"type": "record",
"name": "EventData",
"namespace": "Microsoft.ServiceBus.Messaging",
"fields": [
{
"name": "SequenceNumber",
"type": "long"
},
{
"name": "Offset",
"type": "string"
},
{
"name": "EnqueuedTimeUtc",
"type": "string"
},
{
"name": "SystemProperties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Properties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Body",
"type": [ "null", "bytes" ]
}
]
}
你剩下的类型,我怀疑应该定义为:
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
[KnownType(typeof(Dictionary<string, object>))]
public class EventData
{
[DataMember]
public IDictionary<string, object> SystemProperties { get; set; }
[DataMember]
public IDictionary<string, object> Properties { get; set; }
[DataMember]
public byte[] Body { get; set; }
}
即使 Body
是 null
和 bytes
的联合,它映射到 nullable
byte[]
.
在 C# 中,数组始终是引用类型,因此可以是 null
并且合同已履行。
我能够使用 dynamic
获得完整的数据访问权限。下面是用于访问原始 body
数据的代码,该数据存储为字节数组。在我的例子中,这些字节包含 UTF8 编码的 JSON,但这当然取决于您最初如何创建您发布到事件中心的 EventData
实例:
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
var sequenceNumber = record.SequenceNumber;
var bodyText = Encoding.UTF8.GetString(record.Body);
Console.WriteLine($"{sequenceNumber}: {bodyText}");
}
}
}
如果有人可以 post 静态类型的解决方案,我会投赞成票,但考虑到任何系统中较大的延迟几乎肯定是与事件中心存档 blob 的连接,我不会不用担心解析性能。 :)
我终于能够让它与 Apache C# 库/框架一起工作。
我卡住了一段时间,因为 Azure 事件中心的捕获功能有时会输出一个没有任何消息内容的文件。
我可能还对消息最初序列化到 EventData 对象的方式有疑问。
下面的代码用于从捕获 blob 容器保存到磁盘的文件。
var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)
{
// Do work on EventData object
}
这也适用于使用 GenericRecord 对象。
var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
这需要一些努力才能弄明白。但是我现在同意这个 Azure 事件中心捕获功能是备份所有事件的一个很棒的功能。我仍然觉得他们应该像流分析作业输出那样使格式可选,但也许我会习惯 Avro。
此 Gist 展示了如何使用 Microsoft.Hadoop.Avro2 使用 C# 反序列化事件中心捕获,Avro2 具有同时兼容 .NET Framework 4.5 和 .NET Standard 1.6 的优点:
var connectionString = "<Azure event hub capture storage account connection string>";
var containerName = "<Azure event hub capture container name>";
var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
var blob = container.GetBlockBlobReference(blobName);
using (var stream = blob.OpenRead())
using (var reader = AvroContainer.CreateGenericReader(stream))
while (reader.MoveNext())
foreach (dynamic result in reader.Current.Objects)
{
var record = new AvroEventData(result);
record.Dump();
}
public struct AvroEventData
{
public AvroEventData(dynamic record)
{
SequenceNumber = (long) record.SequenceNumber;
Offset = (string) record.Offset;
DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
EnqueuedTimeUtc = enqueuedTimeUtc;
SystemProperties = (Dictionary<string, object>) record.SystemProperties;
Properties = (Dictionary<string, object>) record.Properties;
Body = (byte[]) record.Body;
}
public long SequenceNumber { get; set; }
public string Offset { get; set; }
public DateTime EnqueuedTimeUtc { get; set; }
public Dictionary<string, object> SystemProperties { get; set; }
public Dictionary<string, object> Properties { get; set; }
public byte[] Body { get; set; }
}
NuGet 引用:
- Microsoft.Hadoop.Avro2(1.2.1 有效)
- WindowsAzure.Storage(8.3.0 有效)
命名空间:
- Microsoft.Hadoop.Avro.Container
- 微软。WindowsAzure.Storage
您还可以使用 NullableSchema
属性将 Body 标记为字节和 null 的并集。这将允许您使用强类型接口。
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
[NullableSchema]
public foo Body { get; set; }
}
我建议您使用 https://github.com/AdrianStrugala/AvroConvert
简单地说:
byte[] avroFileContent = File.ReadAllBytes(fileName);
var result = AvroConvert.Deserialize<EventData>(avroFileContent);
库本身旨在通过使用 Avro 格式改进开发流程。您甚至不需要模型上的架构或属性。 (我是这个图书馆的贡献者)
我找不到使用 C# 反序列化 Apache Avro 文件的方法。 Avro 文件是由 Microsoft Azure 事件中心 Archive feature 生成的文件。
使用 Java 我可以使用 Apache 中的 Avro Tools 将文件转换为 JSON:
java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
使用 NuGet 包 Microsoft.Hadoop.Avro 我能够提取 SequenceNumber
、Offset
和 EnqueuedTimeUtc
,但是因为我不'知道使用什么类型 Body
抛出异常。我试过 Dictionary<string, object>
和其他类型。
static void Main(string[] args)
{
var fileName = "...";
using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (var reader = AvroContainer.CreateReader<EventData>(stream))
{
using (var streamReader = new SequentialReader<EventData>(reader))
{
var record = streamReader.Objects.FirstOrDefault();
}
}
}
}
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
public foo Body { get; set; }
// More properties...
}
架构如下所示:
{
"type": "record",
"name": "EventData",
"namespace": "Microsoft.ServiceBus.Messaging",
"fields": [
{
"name": "SequenceNumber",
"type": "long"
},
{
"name": "Offset",
"type": "string"
},
{
"name": "EnqueuedTimeUtc",
"type": "string"
},
{
"name": "SystemProperties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Properties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Body",
"type": [ "null", "bytes" ]
}
]
}
你剩下的类型,我怀疑应该定义为:
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
[KnownType(typeof(Dictionary<string, object>))]
public class EventData
{
[DataMember]
public IDictionary<string, object> SystemProperties { get; set; }
[DataMember]
public IDictionary<string, object> Properties { get; set; }
[DataMember]
public byte[] Body { get; set; }
}
即使 Body
是 null
和 bytes
的联合,它映射到 nullable
byte[]
.
在 C# 中,数组始终是引用类型,因此可以是 null
并且合同已履行。
我能够使用 dynamic
获得完整的数据访问权限。下面是用于访问原始 body
数据的代码,该数据存储为字节数组。在我的例子中,这些字节包含 UTF8 编码的 JSON,但这当然取决于您最初如何创建您发布到事件中心的 EventData
实例:
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
var sequenceNumber = record.SequenceNumber;
var bodyText = Encoding.UTF8.GetString(record.Body);
Console.WriteLine($"{sequenceNumber}: {bodyText}");
}
}
}
如果有人可以 post 静态类型的解决方案,我会投赞成票,但考虑到任何系统中较大的延迟几乎肯定是与事件中心存档 blob 的连接,我不会不用担心解析性能。 :)
我终于能够让它与 Apache C# 库/框架一起工作。
我卡住了一段时间,因为 Azure 事件中心的捕获功能有时会输出一个没有任何消息内容的文件。
我可能还对消息最初序列化到 EventData 对象的方式有疑问。
下面的代码用于从捕获 blob 容器保存到磁盘的文件。
var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)
{
// Do work on EventData object
}
这也适用于使用 GenericRecord 对象。
var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
这需要一些努力才能弄明白。但是我现在同意这个 Azure 事件中心捕获功能是备份所有事件的一个很棒的功能。我仍然觉得他们应该像流分析作业输出那样使格式可选,但也许我会习惯 Avro。
此 Gist 展示了如何使用 Microsoft.Hadoop.Avro2 使用 C# 反序列化事件中心捕获,Avro2 具有同时兼容 .NET Framework 4.5 和 .NET Standard 1.6 的优点:
var connectionString = "<Azure event hub capture storage account connection string>";
var containerName = "<Azure event hub capture container name>";
var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
var blob = container.GetBlockBlobReference(blobName);
using (var stream = blob.OpenRead())
using (var reader = AvroContainer.CreateGenericReader(stream))
while (reader.MoveNext())
foreach (dynamic result in reader.Current.Objects)
{
var record = new AvroEventData(result);
record.Dump();
}
public struct AvroEventData
{
public AvroEventData(dynamic record)
{
SequenceNumber = (long) record.SequenceNumber;
Offset = (string) record.Offset;
DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
EnqueuedTimeUtc = enqueuedTimeUtc;
SystemProperties = (Dictionary<string, object>) record.SystemProperties;
Properties = (Dictionary<string, object>) record.Properties;
Body = (byte[]) record.Body;
}
public long SequenceNumber { get; set; }
public string Offset { get; set; }
public DateTime EnqueuedTimeUtc { get; set; }
public Dictionary<string, object> SystemProperties { get; set; }
public Dictionary<string, object> Properties { get; set; }
public byte[] Body { get; set; }
}
NuGet 引用:
- Microsoft.Hadoop.Avro2(1.2.1 有效)
- WindowsAzure.Storage(8.3.0 有效)
命名空间:
- Microsoft.Hadoop.Avro.Container
- 微软。WindowsAzure.Storage
您还可以使用 NullableSchema
属性将 Body 标记为字节和 null 的并集。这将允许您使用强类型接口。
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
[NullableSchema]
public foo Body { get; set; }
}
我建议您使用 https://github.com/AdrianStrugala/AvroConvert
简单地说:
byte[] avroFileContent = File.ReadAllBytes(fileName);
var result = AvroConvert.Deserialize<EventData>(avroFileContent);
库本身旨在通过使用 Avro 格式改进开发流程。您甚至不需要模型上的架构或属性。 (我是这个图书馆的贡献者)