在 Azure 逻辑应用程序中反序列化 ServiceBus 内容

Deserializing ServiceBus content in Azure Logic App

我正在尝试读取 Azure 逻辑应用程序中消息的内容正文,但没有取得太大成功。我看到很多建议说body是base64编码的,建议用下面的解码:

@{json(base64ToString(triggerBody()?['ContentData']))}

base64ToString(...) 部分正在将内容正确解码为字符串,但该字符串似乎包含一个前缀,开头有一些额外的序列化信息:

@string3http://schemas.microsoft.com/2003/10/Serialization/�3{"Foo":"Bar"}

该字符串中还有一些额外的字符没有显示在我的浏览器中。所以 json(...) 函数不接受输入,而是给出错误。

InvalidTemplate. Unable to process template language expressions in action 'HTTP' inputs at line '1' and column '2451': 'The template language function 'json' parameter is not valid. The provided value @string3http://schemas.microsoft.com/2003/10/Serialization/�3{"Foo":"bar" } cannot be parsed: Unexpected character encountered while parsing value: @. Path '', line 0, position 0.. Please see https://aka.ms/logicexpressions#json for usage details.'.

作为参考,使用 .NET 服务总线客户端将消息添加到主题(客户端应该无关紧要,但这看起来有点像 C#):

await TopicClient.SendAsync(new BrokeredMessage(JsonConvert.SerializeObject(item)));

如何在我的逻辑应用程序中将其正确读取为 JSON 对象?

您可以将 substring 函数与 indexOflastIndexOf 一起使用,以仅获取 JSON 子字符串。

不幸的是,它相当复杂,但它应该看起来像这样:

@json(substring(base64ToString(triggerBody()?['ContentData']), indexof(base64ToString(triggerBody()?['ContentData']), '{'), add(1, sub(lastindexof(base64ToString(triggerBody()?['ContentData']), '}'), indexof(base64ToString(triggerBody()?['ContentData']), '}')))))

有关如何使用这些功能的更多信息here

HTH

由消息放置在ServiceBus上的方式引起的,特别是在C#代码中。我使用以下代码添加新消息:

var json = JsonConvert.SerializeObject(item);
var message = new BrokeredMessage(json);
await TopicClient.SendAsync(message);

这段代码看起来不错,在不同的 C# 服务之间工作没问题。问题是由 BrokeredMessage(Object) 构造函数序列化给它的有效负载的方式引起的:

Initializes a new instance of the BrokeredMessage class from a given object by using DataContractSerializer with a binary XmlDictionaryWriter.

表示内容被序列化为二进制XML,这说明了前缀和无法识别的字符。这在反序列化时被 C# 实现隐藏,它 returns 您期望的对象,但在使用不同的库(例如 Azure Logic Apps 使用的库)时变得明显。

有两种方法可以解决这个问题:

  • 确保接收方可以处理二进制 XML 格式的消息
  • 确保发件人确实使用了我们想要的格式,例如JSON.

Paco de la Cruz 的回答使用 substringindexOflastIndexOf:

处理第一种情况
@json(substring(base64ToString(triggerBody()?['ContentData']), indexof(base64ToString(triggerBody()?['ContentData']), '{'), add(1, sub(lastindexof(base64ToString(triggerBody()?['ContentData']), '}'), indexof(base64ToString(triggerBody()?['ContentData']), '}')))))

对于第二种情况,从源头上解决问题只需要使用 BrokeredMessage(Stream) 构造函数。这样,我们就可以直接控制内容:

var json = JsonConvert.SerializeObject(item);
var bytes = Encoding.UTF8.GetBytes(json);
var stream = new MemoryStream(bytes);
var message = new BrokeredMessage(stream, true);
await TopicClient.SendAsync(message);

Paco de la Cruz 解决方案对我有用,尽管我必须将表达式中的最后一个“}”换成“{”,否则它会找到错误的数据段结尾。

我也将它分成两步,使其更易于管理。

首先,我使用以下方法将消息中的解码字符串放入一个变量(我称之为 MC):

@{base64ToString(triggerBody()?['ContentData'])}

然后在另一个逻辑应用操作中执行子字符串提取:

@{substring(variables('MC'),indexof(variables('MC'),'{'),add(1,sub(lastindexof(variables('MC'),'}'),indexof(variables('MC'),'{'))))}

请注意,最后一个字符串文字“{”与 Paco 的解决方案相反。

这适用于我的测试用例,但我不确定它有多稳健。

此外,我将其保留为字符串,稍后在我的逻辑应用程序中将其转换为 JSON。

更新

我们发现偶尔(几百次运行中有 2 次)我们要丢弃的文本可以包含“{”字符。 我修改了我们的表达式以明确定位数据段的开头,对我来说是:

'{"IntegrationRequest"'

所以替换为:

@{substring(variables('MC'),indexof(variables('MC'),'{"IntegrationRequest"'),add(1,sub(lastindexof(variables('MC'),'}'),indexof(variables('MC'),'{"IntegrationRequest"'))))}