Azure ServiceBus BrokeredMessage 正文为空

Azure ServiceBus BrokeredMessage body is coming in as null

我有一个 Timer Triggered 函数,它像这样将对象发送到服务总线主题

        [FunctionName("ProcessTermedEmployee")]
        public static async Task Run(
                        [TimerTrigger("%TimerInterval%"
            #if DEBUG
            , RunOnStartup=true
             #endif
            )] TimerInfo myTimer,
            [ServiceBus("%TermedEmployeeTopicName%", Connection = "ServiceBusConnectionString")] IAsyncCollector<string> termedEmployeeCollector,
            ILogger log)
        {
            log.LogInformation("ProcessTermedEmployee Timer function invoked.");
            try
            {
               // var termedEmployees = new List<TermedEmployee>();
                using (SqlConnection conn = new SqlConnection(Environment.GetEnvironmentVariable("HRSQLConNString")))
                {
                    conn.Open();
                    SqlCommand cmd = new SqlCommand("hp_Cdi_Get_Termed_Employees_Recent", conn);
                    cmd.CommandType = CommandType.StoredProcedure;
                    cmd.Parameters.Add(new SqlParameter("@TermedAfterDate", DateTime.Now.AddDays(-10)));
                    using (SqlDataReader rdr = cmd.ExecuteReader())
                    {
                        try
                        {
                            while (rdr.Read())
                            {
                                TermedEmployee termedEmployee = ConvertTermedEmployee(rdr);
                              //  termedEmployees.Add(termedEmployee);
                                await 
               
               termedEmployeeCollector.AddAsync(JsonConvert.SerializeObject(termedEmployee));
                            }
                        }
                        catch (Exception ex)
                        {
                            ex.Data.Add("row", rdr);
                            log.LogCritical(ex, ex.Message);
                            throw ex;
                        }
                    }
                }
                log.LogInformation("ProcessTermedEmployee Timer function finished.");
            }
            catch (Exception ex)
            {
                log.LogCritical(ex, ex.Message);
                throw ex;
            }
        }

然后我有一个服务总线触发器函数,它应该接收消息并将其添加到 CosmosDB

    public static class LogTermedEmployee
    {
        [FunctionName("LogTermedEmployee")]
        public static async Task Run([ServiceBusTrigger("%TermedEmployeeTopicName%", "%ServiceBusSubscriptionName%", Connection = "ServiceBusConnectionString")] BrokeredMessage message,
                [CosmosDB(
                databaseName: "%CosmosDbName%",
                collectionName: "%TermedEmployeesLogCollection%",
                ConnectionStringSetting = "CosmosConnection")]
                IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
          ILogger log)
          {
            log.LogInformation($"C# ServiceBus topic trigger function LogTermedEmployee, 
     processed at {DateTime.Now}");
            try
            {
          
                if(message.GetBody<TermedEmployee>() != null)
                {
                    //StreamReader reader = new StreamReader(message.Body);
                  // string s = System.Text.Encoding.Default.GetString(message.Body);
                          var termedEmp =  JsonConvert.DeserializeObject<TermedEmployee> 
              (message.GetBody<string>());
                    await termedEmployeesLogCollection.AddAsync(new TermedEmployeeLog() { 
                   TermedEmployee = termedEmp, DateOfProcess = DateTime.Now, Id = 
                   Guid.NewGuid(), PartitionKey = "TermedEmployee", ModifiedOn = 
                      DateTime.Now, ModifiedBy = "TermedEployeeLogService", MessageId = "" 
               });// message.MessageId });

                }
            }
            catch (Exception ex)
            {

                log.LogCritical(ex, ex.Message);
                throw ex;
            }
        }
    <PackageReference Include="Azure.Messaging.ServiceBus" Version="7.6.0" />
<PackageReference Include="AzureFunctions.Extensions.DependencyInjection" Version="1.1.3" />
<PackageReference Include="CDI.Utilities.LogHelpers" Version="0.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.10" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.12" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="4.0.1" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
<PackageReference Include="WindowsAzure.ServiceBus" Version="6.2.2" />

问题

在 ServiceBus 触发器的签名中

如果我使用 BrokeredMessage 消息 message.GetBody() 为 null

如果我使用 Obsoleted Message 对象,则 Message.Body 为 null

但如果我使用简单的字符串,我会得到正确的 Json。

有什么建议吗?

我建议将触发器中的 BrokeredMessage 更改为 ServiceBusReceivedMessage

public static async Task Run(
    [ServiceBusTrigger(
        "%TermedEmployeeTopicName%", 
        "%ServiceBusSubscriptionName%", 
        Connection = "ServiceBusConnectionString")] 
    ServiceBusReceivedMessage message,
    [CosmosDB(
        databaseName: "%CosmosDbName%",
        collectionName: "%TermedEmployeesLogCollection%",
        ConnectionStringSetting = "CosmosConnection")]
        IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
        ILogger log)

我相信这会解决绑定问题。

其他上下文

从 v5.0.0 开始,Microsoft.Azure.WebJobs.Extensions.ServiceBus 包开始在内部使用 Azure.Messaging.ServiceBus。新包中没有 BrokeredMessage 类型。传入消息的类型为 ServiceBusReceivedMessage,传出消息的类型为 ServiceBusMessage.

可以在 Microsoft.Azure.WebJobs.Extensions.ServiceBus docs 中找到更多信息。