Azure WebJobs SDK:不在特定的 ServiceBus 触发函数上调用 message.complete()

Azure WebJobs SDK: Not calling message.complete() on a specific ServiceBus triggered function

我有以下形式的服务总线触发函数:

public static void ProcessJobs([ServiceBusTrigger("topicname", "subscriptionname", AccessRights.Listen)] BrokeredMessage input, [ServiceBus("topic2name", "subscription2name", AccessRights.Send)] out BrokeredMessage output)
{
    output = new BrokeredMessage(input.GetBody<String>());
}

我的用例是此函数只是从一个主题获取消息并将它们推送到另一个主题。我不想在此过程中从源主题中删除消息。

有可能实现吗?

此外,我在哪里可以找到有关 AccessRights 以及它们如何影响消息访问的更多信息。例如:在上面的示例中,我从输入主题中获取带有 AccessRights.Listen 的消息,但一旦对这些主题进行了函数调用,它似乎仍然是 "removing" 消息消息。

I don't want to remove the message from the source topic in the process.

this document可知,ServiceBusTrigger在触发完成后会调用complete函数。这是默认模式。这是该文章的片段:

The SDK receives a message in PeekLock mode and calls Complete on the message if the function finishes successfully, or calls Abandon if the function fails. If the function runs longer than the PeekLock timeout, the lock is automatically renewed.

正如 Jambor 所说,如果函数成功完成(参见 documentation),默认行为是完成消息,如果函数失败则放弃。

您可以在 SDK Repo 查看 MessageProcessor class:

的代码中看到此行为的实现
public virtual async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
    if (result.Succeeded)
    {
        if (!MessageOptions.AutoComplete)
        {
            // AutoComplete is true by default, but if set to false
            // we need to complete the message
            cancellationToken.ThrowIfCancellationRequested();
            await message.CompleteAsync();
        }
    }
    else
    {
        cancellationToken.ThrowIfCancellationRequested();
        await message.AbandonAsync();
    }
}

有趣的一点:这是一个虚函数。

ServiceBusConfiguration 公开了 MessagingProvider 属性.

如果你看一下 SDK Repo 中默认 MessagingProvider class 的代码,你会发现你可以覆盖负责创建新的方法MessageProcessor:

/// <summary>
/// Creates a <see cref="MessageProcessor"/> for the specified ServiceBus entity.
/// </summary>
/// <param name="entityPath">The ServiceBus entity to create a <see cref="MessageProcessor"/> for.</param>
/// <returns>The <see cref="MessageProcessor"/>.</returns>
public virtual MessageProcessor CreateMessageProcessor(string entityPath)
{
    if (string.IsNullOrEmpty(entityPath))
    {
        throw new ArgumentNullException("entityPath");
    }
    return new MessageProcessor(_config.MessageOptions);
}

这个函数也是虚的

现在您可以创建自己的 MessagingProviderMessageProcessor 实现:

public class CustomMessagingProvider : MessagingProvider
{
    private readonly ServiceBusConfiguration _config;

    public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
    {
        _config = config;
    }

    public override MessageProcessor CreateMessageProcessor(string entityPath)
    {
        if (string.IsNullOrEmpty(entityPath))
        {
            throw new ArgumentNullException("entityPath");
        }
        return new CustomMessageProcessor(_config.MessageOptions);
    }

    class CustomMessageProcessor : MessageProcessor
    {
        public CustomMessageProcessor(OnMessageOptions messageOptions) : base(messageOptions)
        {
        }

        public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
        {
            if (!result.Succeeded)
            {
                cancellationToken.ThrowIfCancellationRequested();
                await message.AbandonAsync();
            }
        }
    }
}

并像这样配置您的 JobHost :

public static void Main()
{
    var config = new JobHostConfiguration();
    var sbConfig = new ServiceBusConfiguration
    {
        MessageOptions = new OnMessageOptions
        {
            AutoComplete = false
        }
    };
    sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
    config.UseServiceBus(sbConfig);
    var host = new JobHost(config);
    host.RunAndBlock();
}

这是针对此问题的技术部分...

现在,如果您没有完成您的消息,该消息将一次又一次地用于相同的功能,直到您到达 MaxDeliveryCount,然后您的消息将变成死信。所以即使你把你的函数设计成幂等的,我也很确定这不是你想要的。

也许你应该多解释一下你试图达到的目标?

如果您正在寻找父子队列通信(请参阅问题评论),有一篇很好的文章解释了如何使用 ASB 设计工作流:

否则,您可以查看 BrokerMessage 对象上的 Defer 方法:

Indicates that the receiver wants to defer the processing for this message.

它将允许您保留父消息直到子消息被处理。