Azure 服务总线主题的 Service Fabric Actor 订阅

Service Fabric Actor subscription to Azure Service Bus Topic

我正在考虑构建一个系统,该系统要求 Actors 使用特定于 Actor 实例的过滤器创建对 Azure 服务总线主题的订阅。我的问题是,如果 Actor(已订阅主题)已在 Service Fabric 中停用,它会被 Azure 服务总线发送的新消息(重新)激活吗?

谢谢

您不会通过收到消息激活演员。它仅通过远程呼叫和提醒激活。所以这个方法行不通。

您可以做的是在 Service 中接收消息,并将它们转发到 Actor 实例。如果需要,调用 Actor 会即时创建实例。

基于Actor's lifecycle,它必须被激活。来自主题的 Azure 服务总线消息不会激活参与者。相反,您需要一个主管进程来执行此操作。消息可以包含 属性 来表示所需的参与者 ID。它还可以通过单一主题和横向扩展主管来简化 Azure 服务总线拓扑。

这可以通过提醒轻松实现。 由于需要先调用actor,所以可以这样做。

创建方法将设置连接字符串、主题名称、订阅名称并在需要时创建它们。提醒将检查订阅客户端是否不为空,如果是则创建它。提醒将始终在失败时执行,这样您将能够控制失败并在崩溃时重新启动它。

https://github.com/Huachao/azure-content/blob/master/articles/service-fabric/service-fabric-reliable-actors-timers-reminders.md

public async Task<bool> CreateAsync(BusOptions options, CancellationToken cancellationToken)
    {
        if (options?.ConnectionString == null)
        {
            return false;
        }
        await StateManager.AddOrUpdateStateAsync("Options", options,(k,v) => v != options? options:v, cancellationToken);

        var client = new ManagementClient(options.ConnectionString);
        try
        {
            var exist = await client.TopicExistsAsync(options.TopicName, cancellationToken);
            if (!exist)
            {
               await client.CreateTopicAsync(options.TopicName, cancellationToken);
            }
            exist = await client.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            if (!exist)
            {
                await client.CreateSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
            }
            var rules =await client.GetRulesAsync(options.TopicName,options.SubscriptionName,cancellationToken: cancellationToken);
            if(rules.FirstOrDefault(x=>x.Name == options.RuleName) == null)
            {
                SqlFilter filter = new SqlFilter(options.RuleFilterSqlValue);
                await client.CreateRuleAsync(options.TopicName, options.SubscriptionName, new RuleDescription(options.RuleName, filter));
            }

        }
        catch (Exception ex)
        {
            ActorEventSource.Current.ActorMessage(this, ex.Message);                
        }
        return true;
    }
    public async Task DeleteAsync(BusOptions options, CancellationToken cancellationToken)
    {
        var client = new ManagementClient(options.ConnectionString);
        try
        {
            await client.DeleteRuleAsync(options.TopicName, options.SubscriptionName, options.RuleName, cancellationToken);
            await client.DeleteSubscriptionAsync(options.TopicName, options.SubscriptionName, cancellationToken);
        }
        catch (Exception ex)
        {
            ActorEventSource.Current.ActorMessage(this, ex.Message);
        }

    }
    private ISubscriptionClient subscriptionClient;       
    public async Task<bool> SendAsync(SendMessage message, CancellationToken cancellationToken)
    {
        var options =await StateManager.TryGetStateAsync<BusOptions>("Options");
        if (!options.HasValue)
        {
            ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
            return false;
        }


        var client = new TopicClient(options.Value.ConnectionString,options.Value.TopicName);

        var msg = new Message(message.Body);
        if(message.UserProperties != null)
        {
            foreach (var item in message.UserProperties)
            {
                msg.UserProperties.Add(item);
            }
        }
        msg.Label = message.Label;



       await client.SendAsync(msg);
       await StateManager.AddOrUpdateStateAsync("Messages_Send", 1, (key, value) => 1 > value ? 1 : value, cancellationToken);

        return true;
    }
    void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {                
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
    {
        ActorEventSource.Current.ActorMessage(this, message.Label);

        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);


    }
    Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {

        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        ActorEventSource.Current.ActorMessage(this,
            string.Format("Exception context for troubleshooting: - Endpoint: {0}- Entity Path: {1}- Executing Action: {2} - MEssage: {3}",
            context.Endpoint,context.EntityPath,context,exceptionReceivedEventArgs.Exception.Message));
        return Task.CompletedTask;
    }
    protected override async Task OnActivateAsync()
    {
        ActorEventSource.Current.ActorMessage(this, $"Actor '{Id.GetStringId()}' activated.");

        IActorReminder Recieve_Message = await this.RegisterReminderAsync(
                        "Recieve_Message",
                        null,
                        TimeSpan.FromSeconds(1),    //The amount of time to delay before firing the reminder
                        TimeSpan.FromSeconds(1));


    }
    public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
    {
        if (reminderName.Equals("Recieve_Message"))
        {
            if(subscriptionClient == null)
            {
                var options = await StateManager.TryGetStateAsync<BusOptions>("Options");
                if (!options.HasValue)
                {
                    ActorEventSource.Current.ActorMessage(this, "First execute CreateAsync. No options set.");
                    return;
                }

                var conn = new ServiceBusConnectionStringBuilder(options.Value.ConnectionString);

                subscriptionClient = new SubscriptionClient(options.Value.ConnectionString, options.Value.TopicName, options.Value.SubscriptionName);

                RegisterOnMessageHandlerAndReceiveMessages();
            }

        }


    }