将模式添加到 Avro 片段 .Net
Add Schema To Avro Fragment .Net
问题如下。
步骤:
- 应用程序将一些自定义 object 转换为 avro 片段(字节数组);
- 此 avro 片段被发送到 EventData 中的事件中心 object;
- 事件中心触发一个从事件中心接收 Mcrosoft.ServiceBus.Messaging.EventData 的 azure 函数;
- 我可以提取 EventData 的 body,它包含点 1 的 avro 片段(字节数组)。
我正在使用 Microsoft.Hadoop.Avro。
我有原始自定义 object(第 1 点)的架构,所以我尝试创建一个从 avro 片段读取的通用 reader,但我收到以下错误:
Invalid Avro object container in a stream. The header cannot be recognized.
似乎 Microsoft.Hadoop.Avro 只能管理完整的 avro 文件 (header + schema + body) 而不是 avro 片段 (body)。
使用 java avro-tool 我可以将模式添加到 avro 片段。在 .Net 或 .Net Core 中也可以吗?我该怎么办?
为简化以下代码,我用相关的 avro 文件替换了来自事件中心的 EventData。
using (Stream stream = new FileStream(@"...\trip-real-0-2019-03-14-12-14.avro", FileMode.Open, FileAccess.Read, FileShare.Read))
{
// create a generic reader for the event hub avro message
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
//get the body of the event hub message (fragment avro bytes)
var avroFragmentByeArray = (byte[])(record.Body);
// try to create a generic reader with the schema.
// this line throws an exception
using (var r = AvroContainer.CreateGenericReader(schema, new MemoryStream(avroFragmentByeArray), true, new CodecFactory()))
{
}
}
}
}
}
我找到了方法。
有两种方式:
- 使用来自 C# 的 avro-tool.jar;
- 使用 Apache Avro 库(推荐)。
1°解
首先获取事件数据消息中的字节并保存在本地。
public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
{
try
{
string fileName = "avro-bytes.avro";
List<string> filesToProcess = new List<string>();
string singleFileNameToSave = fileName;
filesToProcess.Add(singleFileNameToSave);
string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");
System.IO.Directory.CreateDirectory(path);
File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());
return filesToProcess;
}
catch (Exception ex)
{
throw;
}
}
比从 azure 函数调用 avro-tool.jar 并将输出重定向到变量
Process myProcess = new Process();
myProcess.StartInfo.UseShellExecute = false;
myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";
// execute avro tools
string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
// here you must use the file with the bytes saved before and the avroschema file
myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
myProcess.StartInfo.RedirectStandardOutput = true;
myProcess.Start();
// print the output to a string
string output = myProcess.StandardOutput.ReadToEnd();
myProcess.WaitForExit();
Avro-tool 可能会使用与您需要的模式不同的模式反序列化字节,因此您需要将 avro-tool 模型映射到您的模型上。随着模型复杂性的变化,此步骤会消耗很多资源。
AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);
2°解
这是推荐的解决方案。只需几行即可执行反序列化。
string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
memStream.Seek(0, SeekOrigin.Begin);
Schema writerSchema = Schema.Parse(schema);
Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}
模型 class 必须按如下方式实现 ISpecificRecord 接口:
[DataContract]
public class MyModel: ISpecificRecord
{
[DataMember]
public string Id;
[DataMember]
public enumP Type;
[DataMember]
public long Timestamp;
public Dictionary<string, string> Context;
public static Schema _SCHEMA = Avro.Schema.Parse(@"...");
public virtual Schema Schema
{
get
{
return Position._SCHEMA;
}
}
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Id;
case 1: return this.Timestamp;
case 2: return this.Type;
case 3: return this.Context;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Id = (System.String)fieldValue; break;
case 1: this.Timestamp = (System.Int64)fieldValue; break;
case 2: this.Type = (enumP)fieldValue; break;
case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
[DataContract]
public enum enumP
{
ONE, TWO, THREE
}
class MyModel 中的属性名称必须与使用的模式相同。
问题如下。
步骤:
- 应用程序将一些自定义 object 转换为 avro 片段(字节数组);
- 此 avro 片段被发送到 EventData 中的事件中心 object;
- 事件中心触发一个从事件中心接收 Mcrosoft.ServiceBus.Messaging.EventData 的 azure 函数;
- 我可以提取 EventData 的 body,它包含点 1 的 avro 片段(字节数组)。
我正在使用 Microsoft.Hadoop.Avro。
我有原始自定义 object(第 1 点)的架构,所以我尝试创建一个从 avro 片段读取的通用 reader,但我收到以下错误:
Invalid Avro object container in a stream. The header cannot be recognized.
似乎 Microsoft.Hadoop.Avro 只能管理完整的 avro 文件 (header + schema + body) 而不是 avro 片段 (body)。
使用 java avro-tool 我可以将模式添加到 avro 片段。在 .Net 或 .Net Core 中也可以吗?我该怎么办?
为简化以下代码,我用相关的 avro 文件替换了来自事件中心的 EventData。
using (Stream stream = new FileStream(@"...\trip-real-0-2019-03-14-12-14.avro", FileMode.Open, FileAccess.Read, FileShare.Read))
{
// create a generic reader for the event hub avro message
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
//get the body of the event hub message (fragment avro bytes)
var avroFragmentByeArray = (byte[])(record.Body);
// try to create a generic reader with the schema.
// this line throws an exception
using (var r = AvroContainer.CreateGenericReader(schema, new MemoryStream(avroFragmentByeArray), true, new CodecFactory()))
{
}
}
}
}
}
我找到了方法。 有两种方式:
- 使用来自 C# 的 avro-tool.jar;
- 使用 Apache Avro 库(推荐)。
1°解 首先获取事件数据消息中的字节并保存在本地。
public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
{
try
{
string fileName = "avro-bytes.avro";
List<string> filesToProcess = new List<string>();
string singleFileNameToSave = fileName;
filesToProcess.Add(singleFileNameToSave);
string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");
System.IO.Directory.CreateDirectory(path);
File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());
return filesToProcess;
}
catch (Exception ex)
{
throw;
}
}
比从 azure 函数调用 avro-tool.jar 并将输出重定向到变量
Process myProcess = new Process();
myProcess.StartInfo.UseShellExecute = false;
myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";
// execute avro tools
string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
// here you must use the file with the bytes saved before and the avroschema file
myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
myProcess.StartInfo.RedirectStandardOutput = true;
myProcess.Start();
// print the output to a string
string output = myProcess.StandardOutput.ReadToEnd();
myProcess.WaitForExit();
Avro-tool 可能会使用与您需要的模式不同的模式反序列化字节,因此您需要将 avro-tool 模型映射到您的模型上。随着模型复杂性的变化,此步骤会消耗很多资源。
AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);
2°解
这是推荐的解决方案。只需几行即可执行反序列化。
string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
memStream.Seek(0, SeekOrigin.Begin);
Schema writerSchema = Schema.Parse(schema);
Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}
模型 class 必须按如下方式实现 ISpecificRecord 接口:
[DataContract]
public class MyModel: ISpecificRecord
{
[DataMember]
public string Id;
[DataMember]
public enumP Type;
[DataMember]
public long Timestamp;
public Dictionary<string, string> Context;
public static Schema _SCHEMA = Avro.Schema.Parse(@"...");
public virtual Schema Schema
{
get
{
return Position._SCHEMA;
}
}
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Id;
case 1: return this.Timestamp;
case 2: return this.Type;
case 3: return this.Context;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Id = (System.String)fieldValue; break;
case 1: this.Timestamp = (System.Int64)fieldValue; break;
case 2: this.Type = (enumP)fieldValue; break;
case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
[DataContract]
public enum enumP
{
ONE, TWO, THREE
}
class MyModel 中的属性名称必须与使用的模式相同。