Azure Web 作业和队列

Azure Webjobs and Queues

我正在使用 Azure 服务总线队列(或者如果需要可能是一个主题),并且想知道如何将 Web 作业与队列一起使用。

当一条消息进入队列时,它代表一个进程,该进程将 运行 在网络作业中(或从网络作业启动)。这个过程可能很快,30 秒,也可能很慢,1 小时等等。

我可以为此使用一个 Web 作业并以某种方式说它应该 运行一次不超过 10 个这样的进程吗?

是的,您可以使用 Azure 服务总线队列或主题触发 Web 作业。 Visual Studio.

中的服务总线快速启动项目模板是一个很好的例子,可以帮助您继续前进

特别是,您想查看 Web 作业 SDK 提供的 ServiceBusTrigger 属性。

至于网络作业的可扩展性,这将根据您的网络应用程序实例进行扩展。因此,如果您说 5 个 Web 应用实例启用了 always on,那么您将拥有 5 个 Web 作业实例。作为对此的补充评论,如果您只想在 5 个 Web 应用程序实例的环境中使用一个 Web 作业实例,则可以设置 is_singleton 属性 在 settings.job 文件中设为真。

是的,您可以使用 WebJob。我创建了一个简单的 WebJob with Storage Queue 来指导如何完成它。下面的工作流将 运行 一次只处理十个进程,并将所有其他请求保存在 ConcurrentQueue 的内存中。您将必须实现将其出列并使用的逻辑

public class Functions
{
    public delegate void CompletedProcessHandler(object sender, CompletedProcessHandlerArgs args);

    static readonly Dictionary<int, CustomProcess> _dictionary =
        new Dictionary<int, CustomProcess>();
    static readonly ConcurrentQueue<ProcessEntity> _remaining =
        new ConcurrentQueue<ProcessEntity>();

    // This function will get triggered/executed when a new message is written 
    // on an Azure Queue called queue.
    public static void ProcessQueueMessage([QueueTrigger("testqueue")] ProcessEntity msg,
        TextWriter log)
    {
        if (_dictionary.Count <= 10)
        {
            var newProcess = new CustomProcess((_dictionary.Last().Key) + 1,
                msg.Duration);
        }
        else
        {
            _remaining.Enqueue(msg);
        }

    }

    public static void CompletedProcess(object sender, CompletedProcessHandlerArgs args)
    {
        _dictionary[Int32.Parse(args.ProcessID)].Dispose();
        _dictionary.Remove(Int32.Parse(args.ProcessID));
    }
}

public class CustomProcess : IDisposable
{
    public event Functions.CompletedProcessHandler OnProcessCompleted;
    private CancellationTokenSource _token;
    private string _id;
    private Timer _timer;
    public CustomProcess(int i, int duration)
    {
        _timer = new Timer { Enabled = true, Interval = duration * 1000 };
        _timer.Elapsed += Timer_Elapsed;
        _id = i.ToString();
        _token = new CancellationTokenSource();

        Task.Factory.StartNew(() => WriteMessages());
        _timer.Start();

        OnProcessCompleted += Functions.CompletedProcess;

    }

    private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
    {
        _token.Cancel();
        OnProcessCompleted?.Invoke(this, new CompletedProcessHandlerArgs(_id));
    }

    private void WriteMessages()
    {
        while (!_token.Token.IsCancellationRequested)
        {
            Console.WriteLine("Test Message from process " + _id);
        }
    }

    public void Dispose()
    {
        _token.Dispose();
        _timer.Dispose();
    }
}


public class CompletedProcessHandlerArgs : EventArgs
{
    public string ProcessID { get; set; }

    public CompletedProcessHandlerArgs(string ID)
    {
        ProcessID = ID;
    }
}


public class ProcessEntity
{
    public int Duration { get; set; }
}

在web作业的app.config中需要提供两个app设置

<add name="AzureWebJobsDashboard" 
     connectionString="DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]" />
<add name="AzureWebJobsStorage" 
     connectionString="DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]" />

程序文件是 Visual Studio 模板中的默认文件

public class Program
{
    // Please set the following connection strings in app.config for this WebJob to run:
    // AzureWebJobsDashboard and AzureWebJobsStorage
    static void Main()
    {
        var host = new JobHost();
        // The following code ensures that the WebJob will be running continuously
        host.RunAndBlock();
    }
}

WebJob 会在消息到达时保持出队状态。由于您一次只需要 10 到 运行,因此您必须将消息排入内存并等待 运行ning 进程完成,然后再开始一个新进程

正如@Rick 提到的,您可以在 Web 作业的 settings.job 文件中将 is_Singleton 属性 设置为 true