为什么 ICommunicationListener.Abort() 不断被调用?

Why is ICommunicationListener.Abort() continuously invoked?

下面是本地 SF 集群中的无状态服务侦听器代码 运行 并侦听服务总线队列。

我只想在永远在线的 SF 服务中不断地监听队列命令。欢迎提供代码改进提示!

问题#1

不断调用 Abort(),实际上关闭了我的连接。是什么导致了这种行为,我该如何解决?在我的理解中,Abort() 应该只在未处理的异常或强制关闭的情况下被调用,这两种情况我都不知道。

奖金问题

假设我们注释掉了来自 Abort() 的 CloseClient 调用,这使得我们的队列能够得到正确处理。在第一条消息之后,CancellationToken 的 WaitHandle 被标记为已处置并将其传递到我的回调中会引发异常。这是什么原因造成的?

感谢您的帮助!

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Microsoft.ServiceFabric.Services.Communication.Runtime;

namespace Common
{
    public class QueueListener : ICommunicationListener
    {
        private readonly string _connectionString;
        private readonly string _path;
        private readonly Action<BrokeredMessage> _callback;

        private QueueClient _client;

        public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback)
        {
            // Set field values
            _connectionString = connectionString;
            _path = path;

            // Save callback action
            _callback = callback;
        }

        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            // Connect to subscription
            _client = QueueClient.CreateFromConnectionString(_connectionString, _path);

            // Configure the callback options
            var options = new OnMessageOptions
            {
                AutoComplete = false
            };

            // Catch and throw exceptions
            options.ExceptionReceived += (sender, args) => throw args.Exception;

            // Wire callback on message receipt
            _client.OnMessageAsync(message =>
            {
                return Task.Run(() => _callback(message), cancellationToken)
                    .ContinueWith(task =>
                    {
                        if (task.Status == TaskStatus.RanToCompletion)
                            message.CompleteAsync();
                        else
                            message.AbandonAsync();
                    }, cancellationToken);
            }, options);

            return Task.FromResult(_client.Path);
        }

        public Task CloseAsync(CancellationToken cancellationToken)
        {
            CloseClient();
            return Task.FromResult(_client.Path);
        }

        public void Abort()
        {
            CloseClient();
        }

        private void CloseClient()
        {
            // Make sure client is still open
            if (_client == null || _client.IsClosed)
                return;

            // Close connection
            _client.Close();
            _client = null;
        }
    }
}

传递给 OpenAsync 的取消令牌不打算像现在这样在方法范围之外使用。您正在将其传递给 OnMessage。调用 OpenAsync 后将调用 OnMessage,但令牌会在 OpenAsyn 完成后处理。

创建一个新的 CancellationToken,或者使用这个 nuget 包应该会有所帮助 here

问题在于创建 ICommunicationListener 以与服务总线交互的整个概念;听众什么也没听!

将服务总线连接重构为 StatelessService 实现中的默认 RunAsync() 方法修复了问题并且允许按预期监视取消标记。