Azure ServiceBus BrokeredMessage 正文为空
Azure ServiceBus BrokeredMessage body is coming in as null
我有一个 Timer Triggered 函数,它像这样将对象发送到服务总线主题
[FunctionName("ProcessTermedEmployee")]
public static async Task Run(
[TimerTrigger("%TimerInterval%"
#if DEBUG
, RunOnStartup=true
#endif
)] TimerInfo myTimer,
[ServiceBus("%TermedEmployeeTopicName%", Connection = "ServiceBusConnectionString")] IAsyncCollector<string> termedEmployeeCollector,
ILogger log)
{
log.LogInformation("ProcessTermedEmployee Timer function invoked.");
try
{
// var termedEmployees = new List<TermedEmployee>();
using (SqlConnection conn = new SqlConnection(Environment.GetEnvironmentVariable("HRSQLConNString")))
{
conn.Open();
SqlCommand cmd = new SqlCommand("hp_Cdi_Get_Termed_Employees_Recent", conn);
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(new SqlParameter("@TermedAfterDate", DateTime.Now.AddDays(-10)));
using (SqlDataReader rdr = cmd.ExecuteReader())
{
try
{
while (rdr.Read())
{
TermedEmployee termedEmployee = ConvertTermedEmployee(rdr);
// termedEmployees.Add(termedEmployee);
await
termedEmployeeCollector.AddAsync(JsonConvert.SerializeObject(termedEmployee));
}
}
catch (Exception ex)
{
ex.Data.Add("row", rdr);
log.LogCritical(ex, ex.Message);
throw ex;
}
}
}
log.LogInformation("ProcessTermedEmployee Timer function finished.");
}
catch (Exception ex)
{
log.LogCritical(ex, ex.Message);
throw ex;
}
}
然后我有一个服务总线触发器函数,它应该接收消息并将其添加到 CosmosDB
public static class LogTermedEmployee
{
[FunctionName("LogTermedEmployee")]
public static async Task Run([ServiceBusTrigger("%TermedEmployeeTopicName%", "%ServiceBusSubscriptionName%", Connection = "ServiceBusConnectionString")] BrokeredMessage message,
[CosmosDB(
databaseName: "%CosmosDbName%",
collectionName: "%TermedEmployeesLogCollection%",
ConnectionStringSetting = "CosmosConnection")]
IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
ILogger log)
{
log.LogInformation($"C# ServiceBus topic trigger function LogTermedEmployee,
processed at {DateTime.Now}");
try
{
if(message.GetBody<TermedEmployee>() != null)
{
//StreamReader reader = new StreamReader(message.Body);
// string s = System.Text.Encoding.Default.GetString(message.Body);
var termedEmp = JsonConvert.DeserializeObject<TermedEmployee>
(message.GetBody<string>());
await termedEmployeesLogCollection.AddAsync(new TermedEmployeeLog() {
TermedEmployee = termedEmp, DateOfProcess = DateTime.Now, Id =
Guid.NewGuid(), PartitionKey = "TermedEmployee", ModifiedOn =
DateTime.Now, ModifiedBy = "TermedEployeeLogService", MessageId = ""
});// message.MessageId });
}
}
catch (Exception ex)
{
log.LogCritical(ex, ex.Message);
throw ex;
}
}
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.6.0" />
<PackageReference Include="AzureFunctions.Extensions.DependencyInjection" Version="1.1.3" />
<PackageReference Include="CDI.Utilities.LogHelpers" Version="0.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.10" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.12" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="4.0.1" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
<PackageReference Include="WindowsAzure.ServiceBus" Version="6.2.2" />
问题
在 ServiceBus 触发器的签名中
如果我使用 BrokeredMessage 消息 message.GetBody() 为 null
如果我使用 Obsoleted Message 对象,则 Message.Body 为 null
但如果我使用简单的字符串,我会得到正确的 Json。
有什么建议吗?
我建议将触发器中的 BrokeredMessage
更改为 ServiceBusReceivedMessage
:
public static async Task Run(
[ServiceBusTrigger(
"%TermedEmployeeTopicName%",
"%ServiceBusSubscriptionName%",
Connection = "ServiceBusConnectionString")]
ServiceBusReceivedMessage message,
[CosmosDB(
databaseName: "%CosmosDbName%",
collectionName: "%TermedEmployeesLogCollection%",
ConnectionStringSetting = "CosmosConnection")]
IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
ILogger log)
我相信这会解决绑定问题。
其他上下文
从 v5.0.0 开始,Microsoft.Azure.WebJobs.Extensions.ServiceBus
包开始在内部使用 Azure.Messaging.ServiceBus
。新包中没有 BrokeredMessage
类型。传入消息的类型为 ServiceBusReceivedMessage
,传出消息的类型为 ServiceBusMessage
.
可以在 Microsoft.Azure.WebJobs.Extensions.ServiceBus docs 中找到更多信息。
我有一个 Timer Triggered 函数,它像这样将对象发送到服务总线主题
[FunctionName("ProcessTermedEmployee")]
public static async Task Run(
[TimerTrigger("%TimerInterval%"
#if DEBUG
, RunOnStartup=true
#endif
)] TimerInfo myTimer,
[ServiceBus("%TermedEmployeeTopicName%", Connection = "ServiceBusConnectionString")] IAsyncCollector<string> termedEmployeeCollector,
ILogger log)
{
log.LogInformation("ProcessTermedEmployee Timer function invoked.");
try
{
// var termedEmployees = new List<TermedEmployee>();
using (SqlConnection conn = new SqlConnection(Environment.GetEnvironmentVariable("HRSQLConNString")))
{
conn.Open();
SqlCommand cmd = new SqlCommand("hp_Cdi_Get_Termed_Employees_Recent", conn);
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(new SqlParameter("@TermedAfterDate", DateTime.Now.AddDays(-10)));
using (SqlDataReader rdr = cmd.ExecuteReader())
{
try
{
while (rdr.Read())
{
TermedEmployee termedEmployee = ConvertTermedEmployee(rdr);
// termedEmployees.Add(termedEmployee);
await
termedEmployeeCollector.AddAsync(JsonConvert.SerializeObject(termedEmployee));
}
}
catch (Exception ex)
{
ex.Data.Add("row", rdr);
log.LogCritical(ex, ex.Message);
throw ex;
}
}
}
log.LogInformation("ProcessTermedEmployee Timer function finished.");
}
catch (Exception ex)
{
log.LogCritical(ex, ex.Message);
throw ex;
}
}
然后我有一个服务总线触发器函数,它应该接收消息并将其添加到 CosmosDB
public static class LogTermedEmployee
{
[FunctionName("LogTermedEmployee")]
public static async Task Run([ServiceBusTrigger("%TermedEmployeeTopicName%", "%ServiceBusSubscriptionName%", Connection = "ServiceBusConnectionString")] BrokeredMessage message,
[CosmosDB(
databaseName: "%CosmosDbName%",
collectionName: "%TermedEmployeesLogCollection%",
ConnectionStringSetting = "CosmosConnection")]
IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
ILogger log)
{
log.LogInformation($"C# ServiceBus topic trigger function LogTermedEmployee,
processed at {DateTime.Now}");
try
{
if(message.GetBody<TermedEmployee>() != null)
{
//StreamReader reader = new StreamReader(message.Body);
// string s = System.Text.Encoding.Default.GetString(message.Body);
var termedEmp = JsonConvert.DeserializeObject<TermedEmployee>
(message.GetBody<string>());
await termedEmployeesLogCollection.AddAsync(new TermedEmployeeLog() {
TermedEmployee = termedEmp, DateOfProcess = DateTime.Now, Id =
Guid.NewGuid(), PartitionKey = "TermedEmployee", ModifiedOn =
DateTime.Now, ModifiedBy = "TermedEployeeLogService", MessageId = ""
});// message.MessageId });
}
}
catch (Exception ex)
{
log.LogCritical(ex, ex.Message);
throw ex;
}
}
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.6.0" />
<PackageReference Include="AzureFunctions.Extensions.DependencyInjection" Version="1.1.3" />
<PackageReference Include="CDI.Utilities.LogHelpers" Version="0.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.10" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.12" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Script.ExtensionsMetadataGenerator" Version="4.0.1" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
<PackageReference Include="WindowsAzure.ServiceBus" Version="6.2.2" />
问题
在 ServiceBus 触发器的签名中
如果我使用 BrokeredMessage 消息 message.GetBody() 为 null
如果我使用 Obsoleted Message 对象,则 Message.Body 为 null
但如果我使用简单的字符串,我会得到正确的 Json。
有什么建议吗?
我建议将触发器中的 BrokeredMessage
更改为 ServiceBusReceivedMessage
:
public static async Task Run(
[ServiceBusTrigger(
"%TermedEmployeeTopicName%",
"%ServiceBusSubscriptionName%",
Connection = "ServiceBusConnectionString")]
ServiceBusReceivedMessage message,
[CosmosDB(
databaseName: "%CosmosDbName%",
collectionName: "%TermedEmployeesLogCollection%",
ConnectionStringSetting = "CosmosConnection")]
IAsyncCollector<TermedEmployeeLog> termedEmployeesLogCollection,
ILogger log)
我相信这会解决绑定问题。
其他上下文
从 v5.0.0 开始,Microsoft.Azure.WebJobs.Extensions.ServiceBus
包开始在内部使用 Azure.Messaging.ServiceBus
。新包中没有 BrokeredMessage
类型。传入消息的类型为 ServiceBusReceivedMessage
,传出消息的类型为 ServiceBusMessage
.
可以在 Microsoft.Azure.WebJobs.Extensions.ServiceBus docs 中找到更多信息。