将模式添加到 Avro 片段 .Net

Add Schema To Avro Fragment .Net

问题如下。

步骤:

  1. 应用程序将一些自定义 object 转换为 avro 片段(字节数组);
  2. 此 avro 片段被发送到 EventData 中的事件中心 object;
  3. 事件中心触发一个从事件中心接收 Mcrosoft.ServiceBus.Messaging.EventData 的 azure 函数;
  4. 我可以提取 EventData 的 body,它包含点 1 的 avro 片段(字节数组)。

我正在使用 Microsoft.Hadoop.Avro。

我有原始自定义 object(第 1 点)的架构,所以我尝试创建一个从 avro 片段读取的通用 reader,但我收到以下错误:

Invalid Avro object container in a stream. The header cannot be recognized.

似乎 Microsoft.Hadoop.Avro 只能管理完整的 avro 文件 (header + schema + body) 而不是 avro 片段 (body)。

使用 java avro-tool 我可以将模式添加到 avro 片段。在 .Net 或 .Net Core 中也可以吗?我该怎么办?

为简化以下代码,我用相关的 avro 文件替换了来自事件中心的 EventData。

using (Stream stream = new FileStream(@"...\trip-real-0-2019-03-14-12-14.avro", FileMode.Open, FileAccess.Read, FileShare.Read))
{
    // create a generic reader for the event hub avro message
    using (var reader = AvroContainer.CreateGenericReader(stream))
    {
        while (reader.MoveNext())
        {
            foreach (dynamic record in reader.Current.Objects)
            {
                //get the body of the event hub message (fragment avro bytes)
                var avroFragmentByeArray = (byte[])(record.Body);

                // try to create a generic reader with the schema.
                // this line throws an exception
                using (var r = AvroContainer.CreateGenericReader(schema, new MemoryStream(avroFragmentByeArray), true, new CodecFactory()))                                    
                {

                }
            }
        }
    }
}

我找到了方法。 有两种方式:

  1. 使用来自 C# 的 avro-tool.jar;
  2. 使用 Apache Avro 库(推荐)。

1°解 首先获取事件数据消息中的字节并保存在本地。

public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
    {
        try
        {                
            string fileName = "avro-bytes.avro";
            List<string> filesToProcess = new List<string>();
            string singleFileNameToSave = fileName;
            filesToProcess.Add(singleFileNameToSave);              
            string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");  
            System.IO.Directory.CreateDirectory(path);              
            File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());                
            return filesToProcess;
        }
        catch (Exception ex)
        {
            throw;
        }
    }

比从 azure 函数调用 avro-tool.jar 并将输出重定向到变量

 Process myProcess = new Process();
 myProcess.StartInfo.UseShellExecute = false;
 myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";                   
 // execute avro tools         
 string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
 // here you must use the file with the bytes saved before and the avroschema file
 myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
 myProcess.StartInfo.RedirectStandardOutput = true;
 myProcess.Start();
 // print the output to a string 
 string output = myProcess.StandardOutput.ReadToEnd();
 myProcess.WaitForExit();

Avro-tool 可能会使用与您需要的模式不同的模式反序列化字节,因此您需要将 avro-tool 模型映射到您的模型上。随着模型复杂性的变化,此步骤会消耗很多资源。

AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);

2°解

这是推荐的解决方案。只需几行即可执行反序列化。

string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
   memStream.Seek(0, SeekOrigin.Begin);
   Schema writerSchema = Schema.Parse(schema);
   Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
   output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}

模型 class 必须按如下方式实现 ISpecificRecord 接口:

[DataContract]
public class MyModel: ISpecificRecord
{
    [DataMember]
    public string Id;
    [DataMember]
    public enumP Type;
    [DataMember]
    public long Timestamp;
    public Dictionary<string, string> Context;

    public static Schema _SCHEMA = Avro.Schema.Parse(@"...");

    public virtual Schema Schema
    {
        get
        {
            return Position._SCHEMA;
        }
    }

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return this.Id;
            case 1: return this.Timestamp;
            case 2: return this.Type;                
            case 3: return this.Context;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
        };
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: this.Id = (System.String)fieldValue; break;
            case 1: this.Timestamp = (System.Int64)fieldValue; break;
            case 2: this.Type = (enumP)fieldValue; break;                
            case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
        };
    }
}

[DataContract]
public enum enumP
{
    ONE, TWO, THREE
}

class MyModel 中的属性名称必须与使用的模式相同。