在观察者的 OnNext() 方法中等待任务是否安全?

Is it safe to await a Task in OnNext() method of an Observer?

我创建了一个自定义 Observer,它基本上在其 OnNext() 方法中执行异步任务。

我想知道这样做是否是个好主意,因为 async void 不是 great

public class MessageObserver : IObserver<Message>
{
    private IDisposable _unsubscriber;
    private readonly IQueueClient _client;
    private readonly TelemetryClient _telemetryClient;

    public MessageObserver(IQueueClient client, TelemetryClient telemetryClient)
    {
        _client = client;
        _telemetryClient = telemetryClient;
    }

    public virtual void Subscribe(IObservable<Message> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public virtual void Unsubscribe()
    {
        _unsubscriber.Dispose();
    }

    public virtual void OnCompleted()
    {
    }

    public virtual void OnError(Exception error)
    {
    }

    public virtual async void OnNext(Message message)
    {
        try
        {
            await _client.SendAsync(message);
        }
        catch (Exception ex)
        {
            _telemetryClient.TrackException(ex);
        }
    }
}

EDIT/Add代码

我有一个 API 我从 Angular 客户端向其 post 资源,一旦资源被记录到数据库中,我立即向 Azure 服务总线发送消息然后我return之前记录的实体

我不想在 return 返回客户端之前等待 Azure 服务总线消息被发送,所以我想通知 Rx Observer 我有一条新消息需要处理在另一个线程上异步。

这是我的结构:

    // POST: /api/management/campaign
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] CampaignViewModel model)
    {
        try
        {
            if (ModelState.IsValid)
            {
                var createdCampaign = await _campaignService.CreateCampaign(Mapping.ToCampaign(model));
                _upsertServiceBus.SendMessage(new Message(Encoding.UTF8.GetBytes(createdCampaign.CampaignId.ToString())));
                return Ok(Mapping.ToCampaignViewModel(createdCampaign));
            }

            return BadRequest(ModelState);
        }
        catch (Exception ex)
        {
            _telemetryClient.TrackException(ex);
            return BadRequest(new OpenIdConnectResponse
            {
                Error = OpenIdConnectConstants.Errors.InvalidRequest,
                ErrorDescription = Constants.GenericError
            });
        }
    }

-

    public class BusService : IBusService
    {
        private readonly IObservable<Message> _messageObservable;
        private readonly ICollection<Message> _messages = new Collection<Message>();
        private readonly IQueueClient _queueClient;
        private readonly MessageObserver _messageObserver;
        private readonly TelemetryClient _telemetryClient;

        protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
        {
            _telemetryClient = telemetryClient;
            _queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
            _messageObservable = _messages.ToObservable();
            _messageObserver = new MessageObserver(_queueClient, _telemetryClient);
            _messageObserver.Subscribe(_messageObservable);
        }

        public void SendMessage(Message message)
        {
            _messageObserver.OnNext(message);
        }
    }

EDIT/Solution 在@Shlomo 的回答的帮助下:

public class BusService : IBusService
{
    private readonly IQueueClient _queueClient;
    private readonly TelemetryClient _telemetryClient;
    private readonly Subject<Message> _subject = new Subject<Message>();

    protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
    {
        _telemetryClient = telemetryClient;
        _queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
        _subject
            .ObserveOn(TaskPoolScheduler.Default)
            .SelectMany(message =>
            {
                return Observable.FromAsync(() =>
                {
                    var waitAndRetryPolicy = Policy
                        .Handle<Exception>()
                        .WaitAndRetryAsync(3, retryAttempt =>
                                TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                            (exception, retryCount, context) =>
                            {
                                _telemetryClient.TrackEvent(
                                    $"Sending message to Azure Service Bus failed with exception ${exception.Message}. Retrying...");
                            }
                        );

                    return waitAndRetryPolicy.ExecuteAsync(async ct =>
                    {
                        _telemetryClient.TrackEvent("Sending message to Azure Service Bus...");
                        await _queueClient.SendAsync(message);
                    }, CancellationToken.None);
                });
            })
            .Subscribe(unit => { _telemetryClient.TrackEvent("Message sent to Azure Service Bus."); },
                ex => _telemetryClient.TrackException(ex));
    }

    public void SendMessage(Message message)
    {
        _subject.OnNext(message);
    }
}

我无法复制或测试,但希望这能让你入门。

此解决方案将 _messages_messageObserver_messageObservable 替换为主题和响应式查询。一些注意事项:

  • ObserveOn 允许您通过更改 'schedulers' 来转移线程。我选择了 TaskPoolScheduler,它将在不同的任务上执行其余的查询。
  • 我建议调用 _queueClient.SendAsync 的同步版本,因为此示例允许 Rx 处理线程。
  • 此解决方案使用 Rx 异常处理,在出现异常时将终止 observable/handling。如果你想让它自动重启,那就加一个.Catch/.Retry.

代码:

public class BusService : IBusService
{
    private readonly IQueueClient _queueClient;
    private readonly TelemetryClient _telemetryClient;
    private readonly Subject<Message> _subject = new Subject<Message>();

    protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
    {
        _telemetryClient = telemetryClient;
        _queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
        _subject
            .ObserveOn(TaskPoolScheduler.Default)  // handle on an available task
            .Select(msg => _queueClient.Send(msg)) // syncronous, not async, because already on a different task
            .Subscribe(result => { /* log normal result */}, ex => _telemetryClient.TrackException(e), () => {});
    }

    public void SendMessage(Message message)
    {
        _subject.OnNext(message);
    }
}

正如我提到的,代码使用了 Subject,您会在此处找到大量不推荐它们的问答。如果你愿意,你可以用一个事件和一个基于该事件的可观察对象来替换主题。主题更容易展示,我会在保密的时候争论。