从 Service Bus 队列读取并写入 .Net 中的不同 SB 队列
Read from Service Bus Queue and Write to a different SB Queue in .Net
我有一个业务场景,其中第三方客户端 API 正在调用通用 ServiceBus 队列并发布消息。我想在 Azure 函数中读取这些消息并根据业务规则将这些消息写入不同的服务总线队列。
为此,我在 .Net 中创建了以下 Azure 函数 (v3)
namespace AzureFunctionTesting
{
public class Function1
{
[FunctionName("Function1")]
public async Task<IActionResult> Run(
[ServiceBusTrigger("QueueName", Connection = "StorageQueueConnectionString")] string myQueueItem,
[ServiceBus("Productqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue1,
[ServiceBus("Supplierqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue2,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
var document = JsonConvert.DeserializeObject<ShippingSKU>(myQueueItem);
string connectionString = Environment.GetEnvironmentVariable("QueueConnectionString");
ServiceBusClient client = new ServiceBusClient(connectionString);
ServiceBusSender senderQueue1 = client.CreateSender("queue1");
ServiceBusSender senderQueue2 = client.CreateSender("queue2");
using ServiceBusMessageBatch messageBatchQueue1 = await senderQueue1.CreateMessageBatchAsync();
if (document.Id == 1)
{
Console.WriteLine("Entering into Queue 1");
Console.WriteLine(document.FirstName);
await outputQueue1.AddAsync(document);
}
else
{
Console.WriteLine("Entering into Queue 2");
Console.WriteLine(document.LastName);
await outputQueue2.AddAsync(document);
}
return new OkObjectResult(null);
}
}
}
但是当我尝试 运行 azure 函数时,出现了这个错误
我是 Azure Functions 的新手,非常感谢任何帮助。
谢谢
编辑:
肖恩的简短回答帮助我解决了这个问题。我过度思考并使解决方案过于复杂。令人惊讶的是,在互联网上,我什至找不到一个示例,即代码从 1 个队列读取并写入另一个队列的场景。为了完整起见,我在下面添加了工作代码。
namespace AzureFunctionTesting
{
public class Function1
{
[FunctionName("Function1")]
public async Task Run(
[ServiceBusTrigger("QueueName", Connection = "QueueConnectionString")] string myQueueItem,
[ServiceBus("productqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue1,
[ServiceBus("supplierqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue2,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
var document = JsonConvert.DeserializeObject<ShippingSKU>(myQueueItem);
if (document.Id == 1)
{
Console.WriteLine("Entering into Queue 1");
Console.WriteLine(document.FirstName);
await outputQueue1.AddAsync(document);
}
else
{
Console.WriteLine("Entering into Queue 2");
Console.WriteLine(document.LastName);
await outputQueue2.AddAsync(document);
}
}
}
}
- 您将服务总线触发器与 return 类型混合使用,看起来像 HTTP 触发器。删除
IActionResult
作为您的 return 类型。
IAsyncCollector
是您发送消息的方式,不要实例化新的 ServiceBusClient
和发件人。
我有一个业务场景,其中第三方客户端 API 正在调用通用 ServiceBus 队列并发布消息。我想在 Azure 函数中读取这些消息并根据业务规则将这些消息写入不同的服务总线队列。
为此,我在 .Net 中创建了以下 Azure 函数 (v3)
namespace AzureFunctionTesting
{
public class Function1
{
[FunctionName("Function1")]
public async Task<IActionResult> Run(
[ServiceBusTrigger("QueueName", Connection = "StorageQueueConnectionString")] string myQueueItem,
[ServiceBus("Productqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue1,
[ServiceBus("Supplierqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue2,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
var document = JsonConvert.DeserializeObject<ShippingSKU>(myQueueItem);
string connectionString = Environment.GetEnvironmentVariable("QueueConnectionString");
ServiceBusClient client = new ServiceBusClient(connectionString);
ServiceBusSender senderQueue1 = client.CreateSender("queue1");
ServiceBusSender senderQueue2 = client.CreateSender("queue2");
using ServiceBusMessageBatch messageBatchQueue1 = await senderQueue1.CreateMessageBatchAsync();
if (document.Id == 1)
{
Console.WriteLine("Entering into Queue 1");
Console.WriteLine(document.FirstName);
await outputQueue1.AddAsync(document);
}
else
{
Console.WriteLine("Entering into Queue 2");
Console.WriteLine(document.LastName);
await outputQueue2.AddAsync(document);
}
return new OkObjectResult(null);
}
}
}
但是当我尝试 运行 azure 函数时,出现了这个错误
我是 Azure Functions 的新手,非常感谢任何帮助。 谢谢
编辑: 肖恩的简短回答帮助我解决了这个问题。我过度思考并使解决方案过于复杂。令人惊讶的是,在互联网上,我什至找不到一个示例,即代码从 1 个队列读取并写入另一个队列的场景。为了完整起见,我在下面添加了工作代码。
namespace AzureFunctionTesting
{
public class Function1
{
[FunctionName("Function1")]
public async Task Run(
[ServiceBusTrigger("QueueName", Connection = "QueueConnectionString")] string myQueueItem,
[ServiceBus("productqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue1,
[ServiceBus("supplierqueue", Connection = "QueueConnectionString")] IAsyncCollector<dynamic> outputQueue2,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
var document = JsonConvert.DeserializeObject<ShippingSKU>(myQueueItem);
if (document.Id == 1)
{
Console.WriteLine("Entering into Queue 1");
Console.WriteLine(document.FirstName);
await outputQueue1.AddAsync(document);
}
else
{
Console.WriteLine("Entering into Queue 2");
Console.WriteLine(document.LastName);
await outputQueue2.AddAsync(document);
}
}
}
}
- 您将服务总线触发器与 return 类型混合使用,看起来像 HTTP 触发器。删除
IActionResult
作为您的 return 类型。 IAsyncCollector
是您发送消息的方式,不要实例化新的ServiceBusClient
和发件人。