基于 Apache Camel 的生产者和 .NET 消费者之间的 Azure 服务总线互操作性

Azure Service Bus interoperability between Apache Camel based producer and .NET consumer

我们正在尝试在基于 Apache Camel 的系统与在 Azure 服务总线主题中生成消息的系统和这些消息的基于 .NET 的消费者之间构建基于事件的集成。

生产者使用服务总线的 AMQP 接口,而基于 .NET 的消费者使用 Microsoft 在命名空间 Azure.Messaging.ServiceBus.

中的当前 API

当我们尝试访问收到的消息中的正文时,如下所示:

private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
    try { 
        message = Encoding.UTF8.GetString(args.Message.Body);
    }
    catch( Exception e)
    {
        _logger.LogError(e, "Body not decoded: Message: {@message}", e.Message);
    }

    _logger.LogInformation("Body Type: {@bodytype}, Content-Type: {@contenttype}, Message: {@message}, Properties: {@properties}", raw.Body.BodyType, args.Message.ContentType, message, args.Message.ApplicationProperties);

    await args.CompleteMessageAsync(args.Message);
}

引发了以下异常:

Value cannot be retrieved using the Body property.Use GetRawAmqpMessage to access the underlying Amqp Message object.
System.NotSupportedException: Value cannot be retrieved using the Body property.Use GetRawAmqpMessage to access the underlying Amqp Message object.
   at Azure.Messaging.ServiceBus.Amqp.AmqpMessageExtensions.GetBody(AmqpAnnotatedMessage message)
   at Azure.Messaging.ServiceBus.ServiceBusReceivedMessage.get_Body()

使用服务总线资源管理器查看主题时,消息看起来很奇怪:

@string3http://schemas.microsoft.com/2003/10/Serialization/�_{"metadata":{"version":"1.0.0","message_id":"AGHcNehoD-hK0pPJCSga9v9sXFwC","message_timestamp":"2022-01-10T13:34:32.778Z"},"data":{"source_timestamp":"2022-01-05T17:20:31.000","material":"101052"}}

当消息通过 .NET 生产者发送到另一个主题时,主题中有一个明文 JSON 正文,正如预期的那样。

有没有人成功地使用基于上述两个框架的组件构建了 Azure 服务总线解决方案,互操作性起作用的诀窍是什么?谁可以使用 Camel AMQP 生产者创建具有 BodyType 数据的消息,以便 .NET 服务总线客户端库无需使用 GetRawAmqpMessage 即可解码正文?

我不知道 Camel 使用的是什么格式,但该错误消息表明您尝试解码的 Body 不是 AMQP 数据体,而这是服务总线客户端的数据体图书馆使用和期望。

为了读取编码为 AMQP 值或序列的主体,您需要使用 AMQP 格式的数据,而不是使用 ServiceBusReceivedMessage 便利层。为此,您需要在 ServiceBusReceivedMessage 上调用 GetRawAmqpMessage,这会给您一个 AmqpAnnotatedMessage.

带注释的消息 Body 属性 将 return 和 AmqpMessageBody instance which will allow you to query the BodyType 并使用 TryGet 上的方法之一以其原始格式检索数据AmqpMessageBody.

在我们的生产者端使用 SAP Cloud Integration,当 AMQP 适配器的 Message Type 参数设置为 Binary 时,根据:

https://help.sap.com/viewer/368c481cd6954bdfa5d0435479fd4eaf/Cloud/en-US/d5660c146a93483692335e9d79a8c58f.html.

这似乎对应 Apache Camel jmsMessageType 设置为 Bytes, 有关详细信息,请参阅 https://camel.apache.org/components/3.14.x/amqp-component.html

ServiceBusReceivedMessage 中正文的解码工作正常,BodyType 设置为 Data。如果在生产者端使用 TextBodyType 将被设置为 Value,这会导致正文解码出现问题。