为什么 Azure WebJob ServiceBus 默认反序列化 XML?

Why is Azure WebJob ServiceBus deserializing XML by default?

我有一个简单的 Azure WebJobs ServiceBusTrigger,看起来像

public static async void ProcessQueueMessage([ServiceBusTrigger("myqueuename")] String json, TextWriter log) { ... }

不幸的是,它无法将 JSON 反序列化为 XML(不足为奇)。我已经检查了有效负载并确认它只是一个 UTF-8 编码的字节数组。我有两个问题。

  1. 为什么假设我的字符串是 XML?
  2. 怎么说没有,没有XML,只有一个字符串?

堆栈跟踪:

System.InvalidOperationException: Exception binding parameter 'json' ---> System.Runtime.Serialization.SerializationException: There was an error deserializing the object of type System.String. The input source is not correctly formatted. ---> System.Xml.XmlException: The input source is not correctly formatted.
 at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
 at System.Xml.XmlBufferReader.ReadValue(XmlBinaryNodeType nodeType, ValueHandle value)
 at System.Xml.XmlBinaryReader.ReadNode()
 at System.Xml.XmlBinaryReader.Read()
 at System.Xml.XmlBaseReader.IsStartElement()
 at System.Xml.XmlBaseReader.IsStartElement(XmlDictionaryString localName, XmlDictionaryString namespaceUri)
 at System.Runtime.Serialization.XmlReaderDelegator.IsStartElement(XmlDictionaryString localname, XmlDictionaryString ns)
 at System.Runtime.Serialization.XmlObjectSerializer.IsRootElement(XmlReaderDelegator reader, DataContract contract, XmlDictionaryString name, XmlDictionaryString ns)
 at System.Runtime.Serialization.DataContractSerializer.InternalIsStartObject(XmlReaderDelegator reader)
 at System.Runtime.Serialization.DataContractSerializer.InternalReadObject(XmlReaderDelegator xmlReader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 --- End of inner exception stack trace ---
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.DataContractSerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)
 at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlReader reader, Boolean verifyObjectName)
 at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName)
 at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlDictionaryReader reader)
 at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(Stream stream)
 at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T](XmlObjectSerializer serializer)
 at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T]()
 at Microsoft.Azure.WebJobs.ServiceBus.Triggers.BrokeredMessageToStringConverter.ConvertAsync(BrokeredMessage input, CancellationToken cancellationToken)
 at Microsoft.Azure.WebJobs.ServiceBus.Triggers.ConverterArgumentBindingProvider`1.ConverterArgumentBinding.<BindAsync>d__0.MoveNext()

编辑:WebJobs 文档表明,不仅我所做的应该起作用 (String),而且 ServiceBusTrigger 应该自动反序列化 JSON 对象。但是,如果我尝试取出我的 POCO,我仍然会收到 XML 反序列化错误。有趣的是,如果我将类型设置为 Byte[],我也会收到 XML 反序列化错误,这也应该有效。

编辑 2:Stream 也不起作用。看来 只有 BrokeredMessage 对触发器有效,而 GetBody 是我能找到的从 BrokeredMessage 中获取字符串的唯一方法。

作为对 #2 的回答,一个棘手的解决方法是从 ServiceBusTrigger 获取 BrokeredMessage 并调用 message.ToBody<Stream>()。然后,您可以使用常规方法将字节流转换为字符串。

将 JSON 编码的负载反序列化为对象的示例:


public static async void ProcessQueueMessage([ServiceBusTrigger("my-queue")] BrokeredMessage message)
{
    var stream = message.ToBody();
    using (var streamReader = new StreamReader(stream, Encoding.UTF8)
    {
        var json = await streamReader.ReadToEndAsync();
        var deserialized = JsonConvert.DeserializeObject(json);
    }
}

注意:如果有人有更好的答案,我仍然很感兴趣。在一个完美的世界中,我将能够为 ServiceBusTrigger 提供自定义反序列化器(基于 JSON.net 或只是一个字符串反序列化器),但如果可能的话我不知道该怎么做。

如果您的负载是字符串,您有两个选择:

  1. 将参数类型改为BrokeredMessage然后自己反序列化
  2. BrokeredMessage.ContentType 属性 设置为 text/plain(假设您可以控制生成消息的代码)

除了需要内容类型的奇怪 String 情况外,规则是服务总线负载只能反序列化为与负载相同的对象。那是因为 ServiceBus 二进制序列化程序。

我能够通过添加自定义消息处理器来进行反序列化工作。

https://github.com/Azure/azure-webjobs-sdk-samples/tree/master/BasicSamples/MiscOperations

提供自定义消息处理器示例

您将 ContentType 设置为 application/json,如下所示 -

public class CustomMessagingProvider : MessagingProvider
{
    private readonly ServiceBusConfiguration _config;

    public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
    {
        _config = config;
    }

    public override MessageProcessor CreateMessageProcessor(string entityPath)
    {
        return new CustomMessageProcessor(_config.MessageOptions);
    }

    private class CustomMessageProcessor : MessageProcessor
    {
        public CustomMessageProcessor(OnMessageOptions messageOptions)
            : base(messageOptions)
        {
        }

        public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
        {
            message.ContentType = "application/json";

            return base.BeginProcessingMessageAsync(message, cancellationToken);
        }
    }
}

然后在 webjob 中配置 ServiceBusConfiguration 时设置此消息处理器。

 JobHostConfiguration config = new JobHostConfiguration();
        ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration
        {
            ConnectionString = _servicesBusConnectionString
        };

        serviceBusConfig.MessagingProvider = new CustomMessagingProvider(serviceBusConfig);

        config.UseServiceBus(serviceBusConfig);

我在研究这个问题的时候发现了一些巫术。

我们在二进制序列化方面做得很好,但是当我们通过 webjobs 仪表板观看 webjobs 处理事件时,想看看消息的主体是什么。我从 github 上的 WebJobs ServiceBus 代码中窃取了代码来解决这个问题。我仍然有点困惑为什么你必须将 poco 序列化为 JSON 并将其打包到内存流中,然后将内容类型设置为 application/json。我的结论是,Azure 服务总线 SDK 内部一定有一些东西需要它。我还没有找到该代码,但在测试中效果很好。

首先我们遇到了这种情况,在 webjobs 仪表板中序列化的对象只是字节。

使用下面的代码将消息放到总线上后,我们得到:

我们不必更改消费端的签名,现在我们可以看到 BrokeredMessage 的正文。 public void PersistEvents([ServiceBusTrigger(EventProcessor.CoolTopicName, LoggingSubscription)] EventDto eventDto, TextWriter logger)

这是代码 Azure ServiceBus SDK 修复了它。
这是我的示例代码(从 SDK 借来的): string json = JsonConvert.SerializeObject(eventDto); // Using Json.net here MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(json), writable: false); BrokeredMessage message = new BrokeredMessage(stream); message.ContentType = "application/json"; client.Send(message);