如果连接断开,如何正确处理客户端流?

How to correctly dispose a client stream if the connection has disconnected?

我正在使用 Microsoft.AspNetCore.SignalR 2.1 v1.0.4,并且有一个 ChannelReader 流被使用 v1.0.4 的打字稿客户端使用。

通道显示特定于单个实体的事件数据,因此当客户端的用户导航到该单个实体的页面呈现数据时,预计客户端会订阅通道。如果用户导航到同一页面但针对不同的实体,则客户端将进行另一个订阅调用。

现在我的问题是关于如何最好地取消订阅流,以及一般来说,在集线器连接 stop/start 场景下流对客户端的生命周期是多少,以及服务器是否显式中止连接(由于 access_token 超时等触发客户端刷新连接)?

api 似乎没有显示某些连接状态,因此我目前使用 RxJs Subject 将某些连接状态显示到我的 UI components/services,即当集线器连接的启动调用成功 I surface "true",当 onclose 回调被调用时 I surface "false"。这使我可以尝试在先前订阅的流上调用 dispose 以在连接期间清理内容 disconnect/stop,然后在必要时在成功开始调用时再次调用订阅流。

我试过在流上调用 dispose,如果集线器已连接,这很好,但如果连接处于断开状态,则会出错。我想知道这是否是一个错误。即使集线器断开连接,我也应该能够处理流吗?

只做一个 delete streamsubscription 然后根据需要重新创建是否可以,或者这会以任何方式泄漏吗?

what the lifetime of the stream is to the client under hub connection stop/start scenarios, and if the server explicitly aborts a connection (due to access_token timeouts and so to trigger the client to refresh their connection).

当连接终止时(由于在客户端调用 stop 或服务器中止连接),订阅者的 error 方法将被调用,并显示错误指示流已终止,因为连接已终止。通常,您应该处理 error 方法并将其视为终止事件(即流永远不会产生额外的对象)。在服务器上,如果连接终止(任何一方),将触发 Context.ConnectionAborted 令牌,您可以停止写入您的流。

如果您已经在使用 RxJS,我强烈建议您构建一个小型包装器,将您从 SignalR 返回的对象转换为适当的 RxJS Observable。我们 return 的对象实际上 不是 Observable,但它具有所有相同的基本方法(一个 subscribe 方法,它接受一个带有 completenexterror 方法),所以包装起来应该很简单。

I have tried calling dispose on a stream which is fine if the hub is connected, but it errors if the connection is in a disconnected state. I'm wondering if this is a bug or not.

是的,这可能是一个错误。如果您在集线器断开连接后进行处理,我们不应该抛出。您可以在 https://github.com/aspnet/SignalR 上提交吗?要解决它,您可以相当安全地 try...catch 错误并抑制它(或者如果您偏执,则可能记录它)。

Is it okay to just do a delete streamsubscription and then recreate as required, or will this leak in any way?

您应该始终dispose订阅。如果您只是 delete 它,那么我们无法知道您已经完成了它,我们也永远不会告诉服务器停止。如果您调用 dispose(并且已连接),我们会向服务器 "cancelling" 发送消息流。在 ASP.NET Core 2.1 中,我们不会向您公开此取消,但我们会停止从 ChannelReader 中读取数据。在 ASP.NET Core 2.2 中,我们允许您在 Hub 方法中接受 CancellationToken,并且客户端上的 dispose 方法将在 Hub 方法中触发此令牌。我强烈建议您尝试 ASP.NET Core 2.2 的最新预览版,并在您的 Hub 方法中使用 CancellationToken 来停止流:

public ChannelReader<object> MyStreamingMethod(..., CancellationToken cancellationToken) {
    // pass 'cancellationToken' over to whatever process is writing to the channel
    // and stop writing when the token is triggered
}

注意:如果您这样做,则无需监控 Context.ConnectionAborted,传递给您的 Hub 方法的令牌将涵盖所有取消情况。

在相关说明中,您应该始终 使用Channel.CreateBounded<T>(size) 来创建您的频道。如果您使用无界通道,则更容易泄漏内存,因为编写器可以无限期地继续写入。如果您使用有界频道,如果频道中有 size 个未读项目,写入器将停止(WriteAsyncWaitToWriteAsync 将 "block")(因为,对于例如,客户端已断开连接,我们已停止读取)。