在观察者的 OnNext() 方法中等待任务是否安全?
Is it safe to await a Task in OnNext() method of an Observer?
我创建了一个自定义 Observer,它基本上在其 OnNext() 方法中执行异步任务。
我想知道这样做是否是个好主意,因为 async void 不是 great。
public class MessageObserver : IObserver<Message>
{
private IDisposable _unsubscriber;
private readonly IQueueClient _client;
private readonly TelemetryClient _telemetryClient;
public MessageObserver(IQueueClient client, TelemetryClient telemetryClient)
{
_client = client;
_telemetryClient = telemetryClient;
}
public virtual void Subscribe(IObservable<Message> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public virtual void Unsubscribe()
{
_unsubscriber.Dispose();
}
public virtual void OnCompleted()
{
}
public virtual void OnError(Exception error)
{
}
public virtual async void OnNext(Message message)
{
try
{
await _client.SendAsync(message);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
}
}
}
EDIT/Add代码
我有一个 API 我从 Angular 客户端向其 post 资源,一旦资源被记录到数据库中,我立即向 Azure 服务总线发送消息然后我return之前记录的实体
我不想在 return 返回客户端之前等待 Azure 服务总线消息被发送,所以我想通知 Rx Observer 我有一条新消息需要处理在另一个线程上异步。
这是我的结构:
// POST: /api/management/campaign
[HttpPost]
public async Task<IActionResult> Create([FromBody] CampaignViewModel model)
{
try
{
if (ModelState.IsValid)
{
var createdCampaign = await _campaignService.CreateCampaign(Mapping.ToCampaign(model));
_upsertServiceBus.SendMessage(new Message(Encoding.UTF8.GetBytes(createdCampaign.CampaignId.ToString())));
return Ok(Mapping.ToCampaignViewModel(createdCampaign));
}
return BadRequest(ModelState);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
return BadRequest(new OpenIdConnectResponse
{
Error = OpenIdConnectConstants.Errors.InvalidRequest,
ErrorDescription = Constants.GenericError
});
}
}
-
public class BusService : IBusService
{
private readonly IObservable<Message> _messageObservable;
private readonly ICollection<Message> _messages = new Collection<Message>();
private readonly IQueueClient _queueClient;
private readonly MessageObserver _messageObserver;
private readonly TelemetryClient _telemetryClient;
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_messageObservable = _messages.ToObservable();
_messageObserver = new MessageObserver(_queueClient, _telemetryClient);
_messageObserver.Subscribe(_messageObservable);
}
public void SendMessage(Message message)
{
_messageObserver.OnNext(message);
}
}
EDIT/Solution 在@Shlomo 的回答的帮助下:
public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default)
.SelectMany(message =>
{
return Observable.FromAsync(() =>
{
var waitAndRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, retryCount, context) =>
{
_telemetryClient.TrackEvent(
$"Sending message to Azure Service Bus failed with exception ${exception.Message}. Retrying...");
}
);
return waitAndRetryPolicy.ExecuteAsync(async ct =>
{
_telemetryClient.TrackEvent("Sending message to Azure Service Bus...");
await _queueClient.SendAsync(message);
}, CancellationToken.None);
});
})
.Subscribe(unit => { _telemetryClient.TrackEvent("Message sent to Azure Service Bus."); },
ex => _telemetryClient.TrackException(ex));
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}
我无法复制或测试,但希望这能让你入门。
此解决方案将 _messages
、_messageObserver
和 _messageObservable
替换为主题和响应式查询。一些注意事项:
ObserveOn
允许您通过更改 'schedulers' 来转移线程。我选择了 TaskPoolScheduler
,它将在不同的任务上执行其余的查询。
- 我建议调用
_queueClient.SendAsync
的同步版本,因为此示例允许 Rx 处理线程。
- 此解决方案使用 Rx 异常处理,在出现异常时将终止 observable/handling。如果你想让它自动重启,那就加一个
.Catch/.Retry
.
代码:
public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default) // handle on an available task
.Select(msg => _queueClient.Send(msg)) // syncronous, not async, because already on a different task
.Subscribe(result => { /* log normal result */}, ex => _telemetryClient.TrackException(e), () => {});
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}
正如我提到的,代码使用了 Subject
,您会在此处找到大量不推荐它们的问答。如果你愿意,你可以用一个事件和一个基于该事件的可观察对象来替换主题。主题更容易展示,我会在保密的时候争论。
我创建了一个自定义 Observer,它基本上在其 OnNext() 方法中执行异步任务。
我想知道这样做是否是个好主意,因为 async void 不是 great。
public class MessageObserver : IObserver<Message>
{
private IDisposable _unsubscriber;
private readonly IQueueClient _client;
private readonly TelemetryClient _telemetryClient;
public MessageObserver(IQueueClient client, TelemetryClient telemetryClient)
{
_client = client;
_telemetryClient = telemetryClient;
}
public virtual void Subscribe(IObservable<Message> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public virtual void Unsubscribe()
{
_unsubscriber.Dispose();
}
public virtual void OnCompleted()
{
}
public virtual void OnError(Exception error)
{
}
public virtual async void OnNext(Message message)
{
try
{
await _client.SendAsync(message);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
}
}
}
EDIT/Add代码
我有一个 API 我从 Angular 客户端向其 post 资源,一旦资源被记录到数据库中,我立即向 Azure 服务总线发送消息然后我return之前记录的实体
我不想在 return 返回客户端之前等待 Azure 服务总线消息被发送,所以我想通知 Rx Observer 我有一条新消息需要处理在另一个线程上异步。
这是我的结构:
// POST: /api/management/campaign
[HttpPost]
public async Task<IActionResult> Create([FromBody] CampaignViewModel model)
{
try
{
if (ModelState.IsValid)
{
var createdCampaign = await _campaignService.CreateCampaign(Mapping.ToCampaign(model));
_upsertServiceBus.SendMessage(new Message(Encoding.UTF8.GetBytes(createdCampaign.CampaignId.ToString())));
return Ok(Mapping.ToCampaignViewModel(createdCampaign));
}
return BadRequest(ModelState);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
return BadRequest(new OpenIdConnectResponse
{
Error = OpenIdConnectConstants.Errors.InvalidRequest,
ErrorDescription = Constants.GenericError
});
}
}
-
public class BusService : IBusService
{
private readonly IObservable<Message> _messageObservable;
private readonly ICollection<Message> _messages = new Collection<Message>();
private readonly IQueueClient _queueClient;
private readonly MessageObserver _messageObserver;
private readonly TelemetryClient _telemetryClient;
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_messageObservable = _messages.ToObservable();
_messageObserver = new MessageObserver(_queueClient, _telemetryClient);
_messageObserver.Subscribe(_messageObservable);
}
public void SendMessage(Message message)
{
_messageObserver.OnNext(message);
}
}
EDIT/Solution 在@Shlomo 的回答的帮助下:
public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default)
.SelectMany(message =>
{
return Observable.FromAsync(() =>
{
var waitAndRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, retryCount, context) =>
{
_telemetryClient.TrackEvent(
$"Sending message to Azure Service Bus failed with exception ${exception.Message}. Retrying...");
}
);
return waitAndRetryPolicy.ExecuteAsync(async ct =>
{
_telemetryClient.TrackEvent("Sending message to Azure Service Bus...");
await _queueClient.SendAsync(message);
}, CancellationToken.None);
});
})
.Subscribe(unit => { _telemetryClient.TrackEvent("Message sent to Azure Service Bus."); },
ex => _telemetryClient.TrackException(ex));
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}
我无法复制或测试,但希望这能让你入门。
此解决方案将 _messages
、_messageObserver
和 _messageObservable
替换为主题和响应式查询。一些注意事项:
ObserveOn
允许您通过更改 'schedulers' 来转移线程。我选择了TaskPoolScheduler
,它将在不同的任务上执行其余的查询。- 我建议调用
_queueClient.SendAsync
的同步版本,因为此示例允许 Rx 处理线程。 - 此解决方案使用 Rx 异常处理,在出现异常时将终止 observable/handling。如果你想让它自动重启,那就加一个
.Catch/.Retry
.
代码:
public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default) // handle on an available task
.Select(msg => _queueClient.Send(msg)) // syncronous, not async, because already on a different task
.Subscribe(result => { /* log normal result */}, ex => _telemetryClient.TrackException(e), () => {});
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}
正如我提到的,代码使用了 Subject
,您会在此处找到大量不推荐它们的问答。如果你愿意,你可以用一个事件和一个基于该事件的可观察对象来替换主题。主题更容易展示,我会在保密的时候争论。