自定义 "signal received" activity 无效
Custom "signal received" activity not working
我想实现一个自定义 activity,其行为类似于 Signal Received
– 意味着它“暂停”工作流,当我从客户端(邮递员)调用它以恢复并继续时。
我不想使用现有的 Signal Received
,因为我想在同一操作中暂停工作流之前有一些额外的逻辑。
我遵循了 here.
中的指南
所以我在SignalReceived默认activity的基础上实现了Signal Custom
activity。
using System;
using Elsa.Activities.Signaling.Models;
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;
namespace Elsa.Workflows.CustomActivities.Signals
{
[Trigger(
Category = "Custom",
DisplayName = "Signal Custom",
Description = "Custom - Suspend workflow execution until the specified signal is received.",
Outcomes = new[] { OutcomeNames.Done }
)]
public class SignalCustom : Activity
{
[ActivityInput(Hint = "The name of the signal to wait for.", SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
public string Signal { get; set; } = default!;
[ActivityOutput(Hint = "The input that was received with the signal.")]
public object SignalInput { get; set; }
[ActivityOutput] public object Output { get; set; }
protected override bool OnCanExecute(ActivityExecutionContext context)
{
if (context.Input is Signal triggeredSignal)
return string.Equals(triggeredSignal.SignalName, Signal, StringComparison.OrdinalIgnoreCase);
return false;
}
protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context) => context.WorkflowExecutionContext.IsFirstPass ? OnResume(context) : Suspend();
protected override IActivityExecutionResult OnResume(ActivityExecutionContext context)
{
var triggeredSignal = context.GetInput<Signal>()!;
SignalInput = triggeredSignal.Input;
Output = triggeredSignal.Input;
context.LogOutputProperty(this, nameof(Output), Output);
return Done();
}
}
}
我创建了SignalCustomBookmark.cs
:
using Elsa.Services;
namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
public class SignalCustomBookmark : IBookmark
{
public string Signal { get; set; } = default!;
}
}
还有SignalCustomBookmarkProvider.cs
:
using Elsa.Services;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
public class SignalCustomBookmarkProvider : BookmarkProvider<SignalCustomBookmark, SignalCustom>
{
public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<SignalCustom> context, CancellationToken cancellationToken) => await GetBookmarksInternalAsync(context, cancellationToken).ToListAsync(cancellationToken);
private async IAsyncEnumerable<BookmarkResult> GetBookmarksInternalAsync(BookmarkProviderContext<SignalCustom> context, [EnumeratorCancellation] CancellationToken cancellationToken)
{
var signalName = (await context.ReadActivityPropertyAsync(x => x.Signal, cancellationToken))?.ToLowerInvariant().Trim();
// Can't do anything with an empty signal name.
if (string.IsNullOrEmpty(signalName))
yield break;
yield return Result(new SignalCustomBookmark
{
Signal = signalName
});
}
}
}
我也在ConfigureServices(IServiceCollection services)
注册了新书签:
services.AddBookmarkProvider<SignalCustomBookmarkProvider>();
我创建了一个测试工作流程并添加了此自定义 Signal Custom
,其中 test-signal
作为信号。
我可以使用 api 启动工作流,它工作正常 - 一旦实例到达 Signal Custom
,工作流就会暂停。
我使用 Postman 的这个调用启动工作流程:
https://localhost:5001/v1/workflows/{workflow_id}/execute
但是我想通过触发这个调用来恢复它
https://localhost:5001/v1/signals/test-signal/execute
有了这个身材
{
"WorkflowInstanceId": "{Workflow_Instance_Id}"
}
Postman returns 200Ok 这个身材
{
"$id": "1",
"startedWorkflows": []
}
能否指导我如何从客户端恢复工作流程?
https://localhost:5001/v1/signals/test-signal/execute
端点对您不起作用,因为在内部,它使用 ISignaler
:
await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId)
ISignaler
的默认实现依次执行以下操作:
var normalizedSignal = signal.ToLowerInvariant();
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
nameof(SignalReceived),
new SignalReceivedBookmark { Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);
请注意,上面的代码片段正在使用 SignalReceived activity 和 SignalReceivedBookmark 书签构建工作流查询。
这些查询参数与您触发工作流程所需的查询参数不同,这些工作流程以您的自定义 SignalCustom activity 和 SignalCustomBookmark 开始或阻止 书签类型。
换句话说,您需要再执行两个步骤才能使其生效:
- 实现自定义控制器(假设您想从控制器触发自定义信号;)
- 自己调用 IWorkflowLaunchpad.CollectAndExecuteWorkflowsAsync 而不是依赖
ISignaler
。
更好的是:定义一个名为 ICustomSignaler
的新服务并让其实现完成工作。
例如:
public interface ICustomSignaler
{
/// <summary>
/// Runs all workflows that start with or are blocked on the <see cref="SignalCustom"/> activity.
/// </summary>
Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = null, string? workflowInstanceId = null, string? correlationId = null, CancellationToken cancellationToken = default);
}
public class CustomSignaler : ICustomSignaler
{
public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
var normalizedSignal = signal.ToLowerInvariant();
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
nameof(SignalCustom ),
new SignalCustomBookmark{ Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);
}
您的自定义控制器可能看起来像这样:
[ApiController]
[Route("custom-signals/{signalName}/execute")]
[Produces("application/json")]
public class Execute : Controller
{
private readonly ICustomSignaler _signaler;
private readonly IEndpointContentSerializerSettingsProvider _serializerSettingsProvider;
public Execute(ICustomSignaler signaler, IEndpointContentSerializerSettingsProvider serializerSettingsProvider)
{
_signaler = signaler;
_serializerSettingsProvider = serializerSettingsProvider;
}
[HttpPost]
public async Task<IActionResult> Handle(string signalName, ExecuteSignalRequest request, CancellationToken cancellationToken = default)
{
var result = await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId, cancellationToken).ToList();
if (Response.HasStarted)
return new EmptyResult();
return Json(
new ExecuteSignalResponse(result.Select(x => new CollectedWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList()),
_serializerSettingsProvider.GetSettings());
}
}
我们正在跟踪两个问题,看看我们是否可以提供一种方法来允许用户扩展 SignalReceived activity 而无需过多的自定义代码:
我想实现一个自定义 activity,其行为类似于 Signal Received
– 意味着它“暂停”工作流,当我从客户端(邮递员)调用它以恢复并继续时。
我不想使用现有的 Signal Received
,因为我想在同一操作中暂停工作流之前有一些额外的逻辑。
我遵循了 here.
所以我在SignalReceived默认activity的基础上实现了Signal Custom
activity。
using System;
using Elsa.Activities.Signaling.Models;
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;
namespace Elsa.Workflows.CustomActivities.Signals
{
[Trigger(
Category = "Custom",
DisplayName = "Signal Custom",
Description = "Custom - Suspend workflow execution until the specified signal is received.",
Outcomes = new[] { OutcomeNames.Done }
)]
public class SignalCustom : Activity
{
[ActivityInput(Hint = "The name of the signal to wait for.", SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
public string Signal { get; set; } = default!;
[ActivityOutput(Hint = "The input that was received with the signal.")]
public object SignalInput { get; set; }
[ActivityOutput] public object Output { get; set; }
protected override bool OnCanExecute(ActivityExecutionContext context)
{
if (context.Input is Signal triggeredSignal)
return string.Equals(triggeredSignal.SignalName, Signal, StringComparison.OrdinalIgnoreCase);
return false;
}
protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context) => context.WorkflowExecutionContext.IsFirstPass ? OnResume(context) : Suspend();
protected override IActivityExecutionResult OnResume(ActivityExecutionContext context)
{
var triggeredSignal = context.GetInput<Signal>()!;
SignalInput = triggeredSignal.Input;
Output = triggeredSignal.Input;
context.LogOutputProperty(this, nameof(Output), Output);
return Done();
}
}
}
我创建了SignalCustomBookmark.cs
:
using Elsa.Services;
namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
public class SignalCustomBookmark : IBookmark
{
public string Signal { get; set; } = default!;
}
}
还有SignalCustomBookmarkProvider.cs
:
using Elsa.Services;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
public class SignalCustomBookmarkProvider : BookmarkProvider<SignalCustomBookmark, SignalCustom>
{
public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<SignalCustom> context, CancellationToken cancellationToken) => await GetBookmarksInternalAsync(context, cancellationToken).ToListAsync(cancellationToken);
private async IAsyncEnumerable<BookmarkResult> GetBookmarksInternalAsync(BookmarkProviderContext<SignalCustom> context, [EnumeratorCancellation] CancellationToken cancellationToken)
{
var signalName = (await context.ReadActivityPropertyAsync(x => x.Signal, cancellationToken))?.ToLowerInvariant().Trim();
// Can't do anything with an empty signal name.
if (string.IsNullOrEmpty(signalName))
yield break;
yield return Result(new SignalCustomBookmark
{
Signal = signalName
});
}
}
}
我也在ConfigureServices(IServiceCollection services)
注册了新书签:
services.AddBookmarkProvider<SignalCustomBookmarkProvider>();
我创建了一个测试工作流程并添加了此自定义 Signal Custom
,其中 test-signal
作为信号。
我可以使用 api 启动工作流,它工作正常 - 一旦实例到达 Signal Custom
,工作流就会暂停。
我使用 Postman 的这个调用启动工作流程:
https://localhost:5001/v1/workflows/{workflow_id}/execute
但是我想通过触发这个调用来恢复它
https://localhost:5001/v1/signals/test-signal/execute
有了这个身材
{
"WorkflowInstanceId": "{Workflow_Instance_Id}"
}
Postman returns 200Ok 这个身材
{
"$id": "1",
"startedWorkflows": []
}
能否指导我如何从客户端恢复工作流程?
https://localhost:5001/v1/signals/test-signal/execute
端点对您不起作用,因为在内部,它使用 ISignaler
:
await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId)
ISignaler
的默认实现依次执行以下操作:
var normalizedSignal = signal.ToLowerInvariant();
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
nameof(SignalReceived),
new SignalReceivedBookmark { Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);
请注意,上面的代码片段正在使用 SignalReceived activity 和 SignalReceivedBookmark 书签构建工作流查询。
这些查询参数与您触发工作流程所需的查询参数不同,这些工作流程以您的自定义 SignalCustom activity 和 SignalCustomBookmark 开始或阻止 书签类型。
换句话说,您需要再执行两个步骤才能使其生效:
- 实现自定义控制器(假设您想从控制器触发自定义信号;)
- 自己调用 IWorkflowLaunchpad.CollectAndExecuteWorkflowsAsync 而不是依赖
ISignaler
。
更好的是:定义一个名为 ICustomSignaler
的新服务并让其实现完成工作。
例如:
public interface ICustomSignaler
{
/// <summary>
/// Runs all workflows that start with or are blocked on the <see cref="SignalCustom"/> activity.
/// </summary>
Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = null, string? workflowInstanceId = null, string? correlationId = null, CancellationToken cancellationToken = default);
}
public class CustomSignaler : ICustomSignaler
{
public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
var normalizedSignal = signal.ToLowerInvariant();
return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
nameof(SignalCustom ),
new SignalCustomBookmark{ Signal = normalizedSignal },
correlationId,
workflowInstanceId,
default,
TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);
}
您的自定义控制器可能看起来像这样:
[ApiController]
[Route("custom-signals/{signalName}/execute")]
[Produces("application/json")]
public class Execute : Controller
{
private readonly ICustomSignaler _signaler;
private readonly IEndpointContentSerializerSettingsProvider _serializerSettingsProvider;
public Execute(ICustomSignaler signaler, IEndpointContentSerializerSettingsProvider serializerSettingsProvider)
{
_signaler = signaler;
_serializerSettingsProvider = serializerSettingsProvider;
}
[HttpPost]
public async Task<IActionResult> Handle(string signalName, ExecuteSignalRequest request, CancellationToken cancellationToken = default)
{
var result = await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId, cancellationToken).ToList();
if (Response.HasStarted)
return new EmptyResult();
return Json(
new ExecuteSignalResponse(result.Select(x => new CollectedWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList()),
_serializerSettingsProvider.GetSettings());
}
}
我们正在跟踪两个问题,看看我们是否可以提供一种方法来允许用户扩展 SignalReceived activity 而无需过多的自定义代码: