将 Azure 持久函数用于 ETL 过程

Using Azure durable functions for ETL process

我有以下场景:

我必须执行一个函数来检索 N(N 介于 0 和无限之间)条记录。我必须调用映射函数将记录转换为其他内容并将它们向前移动(通过 http、服务总线、cosmos db 等)

由于 10 分钟的限制,我无法使用常规的 Azure Functions,所以我正在寻找 Durable Functions 是否可以解决我的问题。

我的想法是:
1 - 当持久函数触发时,它从数据库中流式传输记录。
2 - 对于每条记录,它调用映射函数。
3 - 映射后,它通过服务总线将记录发送到消息。

作为概念证明,我做了下面的例子。我模拟在持久函数中接收 1000 条消息,但它的行为方式非常不可靠。如果我发送 1000 条消息,函数有点崩溃或完成时间太长,我希望这段代码几乎立即完成。

#r "Microsoft.Azure.WebJobs.Extensions.DurableTask"

public static async Task<List<string>> Run(DurableOrchestrationContext context, TraceWriter log)
{
    var outputs = new List<string>();

    var tasks = new List<Task<string>>();
    for(int i = 0; i < 1000; i++)
    {
        log.Info(i.ToString());
        tasks.Add(context.CallActivityAsync<string>("Hello", i.ToString()));
    }

    outputs.AddRange(await Task.WhenAll(tasks.ToArray()));

    return outputs;
}

我的问题是:Durable Functions 是否适合这种情况? 我是否应该研究一些非无服务函数方法来从数据库中提取数据?

有没有办法从 Durable Function 中同步调用另一个 Azure 函数?

开始之前,您必须考虑 Durable Functions 的真正工作原理。要了解流程,请看以下示例:

#r "Microsoft.Azure.WebJobs.Extensions.DurableTask"

public static async Task Run(DurableOrchestrationContext context, TraceWriter log)
{
    await context.CallActivityAsync<string>("Hello1");
    await context.CallActivityAsync<string>("Hello2");
}

运行时的工作方式如下:

  1. 它进入编排并命中第一个await,其中一个activityHello1被调用
  2. 控制返回到名为 Dispatcher 的组件,它是框架的内部部分。它检查是否针对当前编排 ID 调用了这个特定的 activity。如果不是,它等待结果并释放编排使用的资源
  3. 一旦等待 Task 完成,Dispatcher 重新创建编排并从头重播
  4. 它再次等待 activity Hello1,但是这次在查询编排历史后它知道,它已经被调用并且结果被保存 - 它使用保存的结果并继续执行
  5. 它击中了第二个 await 整个循环再次进行

如您所见,幕后有大量工作要做。在将工作委派给编排和活动时,还有一条经验法则:

  • 编排应该只编排 - 因为它有很多限制,比如单线程,只等待安全任务(这意味着 DurableOrchestrationContext 类型)并在多个队列(而不是 VM)之间缩放。更重要的是它必须是幂等的(所以它不能使用例如 DateTime.Now 或直接查询数据库)
  • activity 应该执行工作 - 它作为一个典型的函数工作(没有编排限制)并扩展到多个不同的 VM

在您的场景中,您应该只执行一个 activity,它将完成所有工作,而不是在编排中循环遍历记录(特别是因为您不能在编排中使用绑定到例如服务总线- 但是您可以在 activity 中这样做,它可以获取数据、转换数据然后推送到您想要的任何类型的服务)。所以在你的代码中你可以有这样的东西:

[FunctionName("Orchestration_Client")]
public static async Task<string> Orchestration_Client(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "start")] HttpRequestMessage input,
    [OrchestrationClient] DurableOrchestrationClient starter)
{
    return await starter.StartNewAsync("Orchestration", await input.Content.ReadAsStringAsync());
}

[FunctionName("Orchestration")]
public static async Task Orchestration_Start([OrchestrationTrigger] DurableOrchestrationContext context)
{
    var payload = context.GetInput<string>();
    await context.CallActivityAsync(nameof(Activity), payload);
}

[FunctionName("Activity")]
public static string Activity(
    [ActivityTrigger] DurableActivityContext context,
    [Table(TableName, Connection = "TableStorageConnectionName")] IAsyncCollector<FooEntity> foo)
{
    // Get data from request
    var payload = context.GetInput<string>();

    // Fetch data from database
    using(var conn = new SqlConnection())
    ...

    // Transform it
    foreach(var record in databaseResult) 
    {
        // Do some work and push data
        await foo.AddAsync(new FooEntity() { // Properties });
    }

    // Result
    return $"Processed {count} records!!";
}

这与其说是一个真实的例子,不如说是一个想法,但你应该能明白要点。另一件事是 Durable Functions 是否真的是此类操作的最佳解决方案 - 我相信有更好的服务,例如 Azure Data Factory。

此外,您可以按照此处的示例进行详细操作

https://sps-cloud-architect.blogspot.com/2019/12/azure-data-load-etl-process-using-azure.html