如何在 Elsa 工作流程中构建自定义循环 activity

How to build a custom loop activity in Elsa workflows

我正在尝试创建一个自定义 activity,它最终将执行复杂的数据库查询或 API 调用以获取大量记录并循环访问它们。我确信它可以通过内置的流程控制活动来完成,但我想让那些不知道或不关心 foreach 循环是什么的非程序员可以使用它,所以将很多功能放在一个盒子里是不错。

我的第一个尝试是从 ForEach 继承并在让 OnExecute 做它的事情之前做一些初始化,但结果感觉有点 hacky。

public class FancyForEach : ForEach
{
    private bool? Initialized
    {
        get
        {
            return GetState<bool?>("Initialized");
        }
        set
        {
            SetState(value, "Initialized");
        }
    }

    protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context)
    {
        if (Initialized != true)
        {
            Items = GetThingsFromDatabase();
            Initialized = true;
        }

        return base.OnExecute(context);
    }

    protected List<DatabaseThings> GetThingsFromDatabase()
    {
        // Fancy stuff here, including paging eventually.
    }
}

似乎在 activity 中的某处实例化 ForEach 而不是继承它会更简洁一些,但我想不出一种方法来实现它。我想一个不错的解决方案是为每条记录触发另一个工作流程,但我不想那样做,再次让非程序员的人更容易理解。

任何人都可以就完成这项工作的最佳方式提出建议吗?这是我使用 Elsa 的第一个项目,所以我可能从一个完全错误的方向接近它!

如果我理解正确,您的 activity 负责加载数据并对其进行循环,而 activity 的用户应该能够指定每次迭代中发生的情况。

如果是这样,那么您可能会执行如下操作:

[Activity(
    Category = "Control Flow",
    Description = "Iterate over a collection.",
    Outcomes = new[] { OutcomeNames.Iterate, OutcomeNames.Done }
)]
public class FancyForEach : Activity
{
    private bool? Initialized
    {
        get => GetState<bool?>();
        set => SetState(value);
    }
    
    private IList<DatabaseThings>? Items
    {
        get => GetState<IList<DatabaseThings>?>();
        set => SetState(value);
    }
    
    private int? CurrentIndex
    {
        get => GetState<int?>();
        set => SetState(value);
    }
    
    protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context)
    {
        if (Initialized != true)
        {
            Items = GetThingsFromDatabase();
            Initialized = true;
        }
        
        var collection = Items.ToList();
        var currentIndex = CurrentIndex ?? 0;

        if (currentIndex < collection.Count)
        {
            var currentValue = collection[currentIndex];
            var scope = context.CreateScope();

            scope.Variables.Set("CurrentIndex", currentIndex);
            scope.Variables.Set("CurrentValue", currentValue);

            CurrentIndex = currentIndex + 1;
            context.JournalData.Add("Current Index", currentIndex);

            // For each iteration, return an outcome to which the user can connect activities to.
            return Outcome(OutcomeNames.Iterate, currentValue);
        }

        CurrentIndex = null;
        return Done();
    }
    
    protected List<DatabaseThings> GetThingsFromDatabase()
    {
        // Fancy stuff here, including paging eventually.
    }
}

此示例将数据库项加载到内存中一次,然后将此列表存储在工作流状态中(通过 Items)- 这可能是理想的,也可能不是理想的,因为这有可能增加工作流实例显着取决于每条记录的大小和记录数。

一种更具可扩展性的方法是每次迭代只加载一个项目,跟踪加载的当前索引,递增它(即页面大小为 1 的分页)。