在 RX 和 Signalr 中返回错误

Returning an error in RX and Signalr

我有一个信号器(.Net Core 版本)集线器,returns 使用 IObservable 的数据流。这几乎是直接从这个流式示例中提取的: https://github.com/radu-matei/signalr-samples/tree/master/streaming/web

public IObservable<MyStreamItem> StreamData(SomeRequestData request)
{}

我想验证传入的请求是否正常。只是标准验证的东西。有效的电子邮件、日期等

所以我正在尝试使用 Observable.Throw 将此错误反馈给客户端

public IObservable<MyStreamItem> StreamData(SomeRequestData request)
{
    if (!IsValid(request))
    {
        return Observable.Throw<MyStreamItem>(new Exception("Invalid Request"));
    }   
}

是这样吗? 错误消息以某种方式丢失在与浏览器的通信中,因此流终止,但没有错误消息发送到客户端

我最初尝试在一些中间件中这样做,但似乎只有 websocket 的初始创建被中间件捕获,而不是后续调用

知道我做错了什么吗?

如果您按照链接示例中的方式进行操作,则应该这样做:

observer.OnError(new Exception("Invalid Request"));

在示例中,他的错误函数错误(名为'err')。应该是:

error: function(error) {
    console.log(error);
}

为了适应传递的异常,我会更改上面的内容,只发送另一个 OnNext,但传递一个这样的对象:

StreamingHub

public class Message
{
    public string Text { get; set; }

    public Exception Error { get; set; }
}

public IObservable<Message> StartStreaming()
{
    return Observable.Create(
        async (IObserver<Message> observer) =>
        {
            for (int i = 0; i < 2; i++)
            {
                observer.OnNext(new Message { Text = $"sending...{i}" });
                await Task.Delay(1000);
            }
            observer.OnNext(new Message { Error = new Exception("Invalid Request") });
        });

index.html

function onStreamReceived(data){
    if (data.text) {
        console.log("received: " + data.text);
        var liElement = document.createElement('li');
        liElement.innerHTML = '<strong>' + "received" + '</strong>:&nbsp;&nbsp;' + data.text;
        document.getElementById('discussion').appendChild(liElement);
    }
    if (data.error) {
        console.log("error: " + data.error.Message);
    }
}

这是一个已知问题,现在有一个拉取请求将异常从 Stream hub 方法传播到现在打开的客户端。它将出现在即将发布的 SignalR Core 的 Preview1 版本中。 Link 到拉取请求:https://github.com/aspnet/SignalR/pull/1331

为了帮助您了解 Rx,我建议您重写 StreamData。当调用 StreamData 和订阅结果流之间有一段时间时,当前的实现会产生问题。调用 StreamData 时请求可能无效,但不久之后,在订阅之前,请求无效。

你可以这样写:

public IObservable<MyStreamItem> StreamData(SomeRequestData request)
{
    return Observable.Defer(() =>
    {
        if (!IsValid(request))
        {
            return Observable.Throw<MyStreamItem>(new Exception("Invalid Request"));
        }
        else
            //return observable when request is valid
    });
}