流分析 'Invalid Avro Format, drop invalid record.'

stream analytics 'Invalid Avro Format, drop invalid record.'

我正在尝试使用 'Microsoft Avro Library' 将我的 C# classes 序列化为 'Avro' 并将其发送到事件中心。但是,当我尝试通过流分析读取数据时,它会在日志 'Invalid Avro Format, drop invalid record'

中给出此错误

更多详情.. 使用 https://azure.microsoft.com/en-in/documentation/articles/hdinsight-dotnet-avro-serialization/ 中所示的反射方法序列化为 avro 格式并将其发送到事件中心

//Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
        //for serializing only properties attributed with DataContract/DateMember
        var avroSerializer = AvroSerializer.Create<SensorData>();

        //Create a memory stream buffer
        using (var buffer = new MemoryStream())
        {
            //Create a data set by using sample class and struct
            var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };

            //Serialize the data to the specified stream
            avroSerializer.Serialize(buffer, expected);
            var bytes = buffer.ToArray();
            var data = new EventData(bytes) {PartitionKey = "deviceId"};
            // send to event hub client
            eventHubClient.Send(data);
        }

事件可以很好地发布到事件中心。我创建了一个可以使用这些事件并可以反序列化它们的工作者角色。

然而,当我将此事件中心设置为我的流分析的输入并将事件序列化格式设置为 'avro' 时,它会出现以下错误..

Message: Invalid Avro Format, drop invalid record.

Message: IncorrectSerializationFormat errors are occuring too rapidly. They are being suppressed temporarily

我想我也必须包括 Avro Schema。任何人都可以指导我将 C# class 序列化为 'avro' 以便流分析能够理解它的正确方法吗?

感谢您的宝贵时间。

您必须包含架构。下面是一个示例,说明如何将事件与 Schema 一起发送。这使用 AvroContainer。

        var eventHubClient = EventHubClient.CreateFromConnectionString("ReplaceConnectionString","ReplaceEventHubPath");
        int numberOfEvents = 10;
        using (var memoryStream = new MemoryStream())
        using (var avroWriter = AvroContainer.CreateWriter<SensorData>(memoryStream, Codec.Null))
        using (var sqWriter = new SequentialWriter<SensorData>(avroWriter, numberOfEvents))
        {
            Enumerable.Range(0, numberOfEvents)
                .Select(i => new SensorData() { Id = "DeviceId", Value = i })
                .ToList()
                .ForEach(data => sqWriter.Write(data));
            memoryStream.Seek(0, SeekOrigin.Begin);
            var eventData = new EventData(memoryStream.ToArray());
            eventHubClient.Send(eventData);
        }