为什么 ICommunicationListener.Abort() 不断被调用?
Why is ICommunicationListener.Abort() continuously invoked?
下面是本地 SF 集群中的无状态服务侦听器代码 运行 并侦听服务总线队列。
我只想在永远在线的 SF 服务中不断地监听队列命令。欢迎提供代码改进提示!
问题#1
不断调用 Abort(),实际上关闭了我的连接。是什么导致了这种行为,我该如何解决?在我的理解中,Abort() 应该只在未处理的异常或强制关闭的情况下被调用,这两种情况我都不知道。
奖金问题
假设我们注释掉了来自 Abort() 的 CloseClient 调用,这使得我们的队列能够得到正确处理。在第一条消息之后,CancellationToken 的 WaitHandle 被标记为已处置并将其传递到我的回调中会引发异常。这是什么原因造成的?
感谢您的帮助!
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
namespace Common
{
public class QueueListener : ICommunicationListener
{
private readonly string _connectionString;
private readonly string _path;
private readonly Action<BrokeredMessage> _callback;
private QueueClient _client;
public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback)
{
// Set field values
_connectionString = connectionString;
_path = path;
// Save callback action
_callback = callback;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
// Connect to subscription
_client = QueueClient.CreateFromConnectionString(_connectionString, _path);
// Configure the callback options
var options = new OnMessageOptions
{
AutoComplete = false
};
// Catch and throw exceptions
options.ExceptionReceived += (sender, args) => throw args.Exception;
// Wire callback on message receipt
_client.OnMessageAsync(message =>
{
return Task.Run(() => _callback(message), cancellationToken)
.ContinueWith(task =>
{
if (task.Status == TaskStatus.RanToCompletion)
message.CompleteAsync();
else
message.AbandonAsync();
}, cancellationToken);
}, options);
return Task.FromResult(_client.Path);
}
public Task CloseAsync(CancellationToken cancellationToken)
{
CloseClient();
return Task.FromResult(_client.Path);
}
public void Abort()
{
CloseClient();
}
private void CloseClient()
{
// Make sure client is still open
if (_client == null || _client.IsClosed)
return;
// Close connection
_client.Close();
_client = null;
}
}
}
传递给 OpenAsync
的取消令牌不打算像现在这样在方法范围之外使用。您正在将其传递给 OnMessage
。调用 OpenAsync 后将调用 OnMessage,但令牌会在 OpenAsyn 完成后处理。
创建一个新的 CancellationToken
,或者使用这个 nuget 包应该会有所帮助 here。
问题在于创建 ICommunicationListener 以与服务总线交互的整个概念;听众什么也没听!
将服务总线连接重构为 StatelessService 实现中的默认 RunAsync() 方法修复了问题并且允许按预期监视取消标记。
下面是本地 SF 集群中的无状态服务侦听器代码 运行 并侦听服务总线队列。
我只想在永远在线的 SF 服务中不断地监听队列命令。欢迎提供代码改进提示!
问题#1
不断调用 Abort(),实际上关闭了我的连接。是什么导致了这种行为,我该如何解决?在我的理解中,Abort() 应该只在未处理的异常或强制关闭的情况下被调用,这两种情况我都不知道。
奖金问题
假设我们注释掉了来自 Abort() 的 CloseClient 调用,这使得我们的队列能够得到正确处理。在第一条消息之后,CancellationToken 的 WaitHandle 被标记为已处置并将其传递到我的回调中会引发异常。这是什么原因造成的?
感谢您的帮助!
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
namespace Common
{
public class QueueListener : ICommunicationListener
{
private readonly string _connectionString;
private readonly string _path;
private readonly Action<BrokeredMessage> _callback;
private QueueClient _client;
public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback)
{
// Set field values
_connectionString = connectionString;
_path = path;
// Save callback action
_callback = callback;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
// Connect to subscription
_client = QueueClient.CreateFromConnectionString(_connectionString, _path);
// Configure the callback options
var options = new OnMessageOptions
{
AutoComplete = false
};
// Catch and throw exceptions
options.ExceptionReceived += (sender, args) => throw args.Exception;
// Wire callback on message receipt
_client.OnMessageAsync(message =>
{
return Task.Run(() => _callback(message), cancellationToken)
.ContinueWith(task =>
{
if (task.Status == TaskStatus.RanToCompletion)
message.CompleteAsync();
else
message.AbandonAsync();
}, cancellationToken);
}, options);
return Task.FromResult(_client.Path);
}
public Task CloseAsync(CancellationToken cancellationToken)
{
CloseClient();
return Task.FromResult(_client.Path);
}
public void Abort()
{
CloseClient();
}
private void CloseClient()
{
// Make sure client is still open
if (_client == null || _client.IsClosed)
return;
// Close connection
_client.Close();
_client = null;
}
}
}
传递给 OpenAsync
的取消令牌不打算像现在这样在方法范围之外使用。您正在将其传递给 OnMessage
。调用 OpenAsync 后将调用 OnMessage,但令牌会在 OpenAsyn 完成后处理。
创建一个新的 CancellationToken
,或者使用这个 nuget 包应该会有所帮助 here。
问题在于创建 ICommunicationListener 以与服务总线交互的整个概念;听众什么也没听!
将服务总线连接重构为 StatelessService 实现中的默认 RunAsync() 方法修复了问题并且允许按预期监视取消标记。