自定义 "signal received" activity 无效

Custom "signal received" activity not working

我想实现一个自定义 activity,其行为类似于 Signal Received – 意味着它“暂停”工作流,当我从客户端(邮递员)调用它以恢复并继续时。 我不想使用现有的 Signal Received,因为我想在同一操作中暂停工作流之前有一些额外的逻辑。 我遵循了 here.

中的指南

所以我在SignalReceived默认activity的基础上实现了Signal Customactivity。

using System;
using Elsa.Activities.Signaling.Models;
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;

namespace Elsa.Workflows.CustomActivities.Signals
{
    [Trigger(
        Category = "Custom",
        DisplayName = "Signal Custom",
        Description = "Custom - Suspend workflow execution until the specified signal is received.",
        Outcomes = new[] { OutcomeNames.Done }
    )]
    public class SignalCustom : Activity
    {
        [ActivityInput(Hint = "The name of the signal to wait for.", SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
        public string Signal { get; set; } = default!;

        [ActivityOutput(Hint = "The input that was received with the signal.")]
        public object SignalInput { get; set; }

        [ActivityOutput] public object Output { get; set; }

        protected override bool OnCanExecute(ActivityExecutionContext context)
        {
            if (context.Input is Signal triggeredSignal)
                return string.Equals(triggeredSignal.SignalName, Signal, StringComparison.OrdinalIgnoreCase);

            return false;
        }

        protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context) => context.WorkflowExecutionContext.IsFirstPass ? OnResume(context) : Suspend();

        protected override IActivityExecutionResult OnResume(ActivityExecutionContext context)
        {
            var triggeredSignal = context.GetInput<Signal>()!;
            SignalInput = triggeredSignal.Input;
            Output = triggeredSignal.Input;
            context.LogOutputProperty(this, nameof(Output), Output);
            return Done();
        }
    }
}

我创建了SignalCustomBookmark.cs:

using Elsa.Services;

namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
    public class SignalCustomBookmark : IBookmark
    {
        public string Signal { get; set; } = default!;
    }
}

还有SignalCustomBookmarkProvider.cs

using Elsa.Services;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
    public class SignalCustomBookmarkProvider : BookmarkProvider<SignalCustomBookmark, SignalCustom>
    {
        public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<SignalCustom> context, CancellationToken cancellationToken) => await GetBookmarksInternalAsync(context, cancellationToken).ToListAsync(cancellationToken);

        private async IAsyncEnumerable<BookmarkResult> GetBookmarksInternalAsync(BookmarkProviderContext<SignalCustom> context, [EnumeratorCancellation] CancellationToken cancellationToken)
        {
            var signalName = (await context.ReadActivityPropertyAsync(x => x.Signal, cancellationToken))?.ToLowerInvariant().Trim();

            // Can't do anything with an empty signal name.
            if (string.IsNullOrEmpty(signalName))
                yield break;

            yield return Result(new SignalCustomBookmark
            {
                Signal = signalName
            });
        }
    }
}

我也在ConfigureServices(IServiceCollection services)注册了新书签:

services.AddBookmarkProvider<SignalCustomBookmarkProvider>();

我创建了一个测试工作流程并添加了此自定义 Signal Custom,其中 test-signal 作为信号。

我可以使用 api 启动工作流,它工作正常 - 一旦实例到达 Signal Custom,工作流就会暂停。 我使用 Postman 的这个调用启动工作流程:

https://localhost:5001/v1/workflows/{workflow_id}/execute

但是我想通过触发这个调用来恢复它

https://localhost:5001/v1/signals/test-signal/execute

有了这个身材

{
    "WorkflowInstanceId": "{Workflow_Instance_Id}"
}

Postman returns 200Ok 这个身材

{
    "$id": "1",
    "startedWorkflows": []
}

能否指导我如何从客户端恢复工作流程?

https://localhost:5001/v1/signals/test-signal/execute 端点对您不起作用,因为在内部,它使用 ISignaler:

await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId)

ISignaler 的默认实现依次执行以下操作:

var normalizedSignal = signal.ToLowerInvariant();

return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
    nameof(SignalReceived),
    new SignalReceivedBookmark { Signal = normalizedSignal },
    correlationId,
    workflowInstanceId,
    default,
    TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);

请注意,上面的代码片段正在使用 SignalReceived activity 和 SignalReceivedBookmark 书签构建工作流查询。

这些查询参数与您触发工作流程所需的查询参数不同,这些工作流程以您的自定义 SignalCustom activity 和 SignalCustomBookmark 开始或阻止 书签类型。

换句话说,您需要再执行两个步骤才能使其生效:

  1. 实现自定义控制器(假设您想从控制器触发自定义信号;)
  2. 自己调用 IWorkflowLaunchpad.CollectAndExecuteWorkflowsAsync 而不是依赖 ISignaler

更好的是:定义一个名为 ICustomSignaler 的新服务并让其实现完成工作。

例如:

public interface ICustomSignaler
{
   /// <summary>
   /// Runs all workflows that start with or are blocked on the <see cref="SignalCustom"/> activity.
   /// </summary>
   Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = null, string? workflowInstanceId = null, string? correlationId = null, CancellationToken cancellationToken = default);
}

public class CustomSignaler : ICustomSignaler
{
   public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
    var normalizedSignal = signal.ToLowerInvariant();

    return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
        nameof(SignalCustom ),
        new SignalCustomBookmark{ Signal = normalizedSignal },
        correlationId,
        workflowInstanceId,
        default,
        TenantId
    ), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);

}

您的自定义控制器可能看起来像这样:

[ApiController]
[Route("custom-signals/{signalName}/execute")]
[Produces("application/json")]
public class Execute : Controller
{
    private readonly ICustomSignaler _signaler;
    private readonly IEndpointContentSerializerSettingsProvider _serializerSettingsProvider;

    public Execute(ICustomSignaler signaler, IEndpointContentSerializerSettingsProvider serializerSettingsProvider)
    {
        _signaler = signaler;
        _serializerSettingsProvider = serializerSettingsProvider;
    }

    [HttpPost]
    public async Task<IActionResult> Handle(string signalName, ExecuteSignalRequest request, CancellationToken cancellationToken = default)
    {
        var result = await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId, cancellationToken).ToList();

        if (Response.HasStarted)
            return new EmptyResult();

        return Json(
            new ExecuteSignalResponse(result.Select(x => new CollectedWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList()),
            _serializerSettingsProvider.GetSettings());
    }
}

我们正在跟踪两个问题,看看我们是否可以提供一种方法来允许用户扩展 SignalReceived activity 而无需过多的自定义代码: