监控新数据的网络流

Monitoring network stream for new data

我正在编写一个对来自某些网络设备的状态信息感兴趣的应用程序。其中一个设备通过 Http 提供状态信息并使用多部分消息;第一次查询信息时,它会向下发送整个状态,此后只要设备状态发生变化,就会向流中发送一条新的多部分消息,其中仅包含更改。

我正在使用 C# 并且有兴趣使用 HttpClient 或等价物来打开流,读取流中当前的所有信息,然后在有新信息时监视此流,以便我可以更新状态信息应用程序中相应的设备。

本质上我的代码看起来像这样

using (var handler = new HttpClientHandler { Credentials = new NetworkCredential(username, password) })
{
    using (var client = new HttpClient(handler))
    {
        var task = client.GetStreamAsync(uri);

        task.Wait();

        var stream = task.Result;

        while(true)
        {
            byte[] bytes = ReadBytesFromStream(stream);

            DoSomethingWithBytes(bytes);
        }
}

然而,现实生活中的代码在线程中运行,并且需要在被告知时正确终止。

我遇到的问题是,当流中没有任何内容时,stream.ReadByte() 上的读取调用会阻塞。如果我在流上放置一个 ReadTimeout 然后当 Read 调用失败时(即当没有新信息准备好时)然后 CanRead 属性 设置为 false 并且我必须重新启动该过程但是这样做会收到所有原始信息再次显示状态信息,而不仅仅是已更改的元素。

有什么办法可以使流保持活动状态,直到我告诉它终止,同时在没有可用信息的情况下能够在读取时解除阻塞吗?我需要这样做的原因是因为应用程序是多线程的,所以我需要安全地终止此代码并且读取会阻止应用程序关闭。

我没有使用 HttpClient,而是使用 HttpWebRequest 并将 KeepAlive 设置为 true,将 AllowReadStreamBuffering 属性设置为 true。这使流保持活动状态,并允许您在字节可用时读取字节。

通过保留对从 GetResponseStream 返回的网络流的引用,我们可以在 NetworkStream 上调用 Dispose,它会中断当前正在发生的任何读取,否则读取可以阻塞,只要它需要,即直到它收到解决线程生命周期问题的数据。

处理"I/O operation blocks my thread"问题的正确方法是使用异步I/O。 .NET 网络组件在这里提供了许多选项,但在您的情况下,您似乎正在从流中读取甚至(错误地)使用 GetStreamAsync() 方法,因此可以清理代码以正确和干净地处理.

例如:

async Task ExecuteUriAsync(string username, string password, Uri uri)
{
    using (var handler = new HttpClientHandler { Credentials = new NetworkCredential(username, password) })
    {
        using (var client = new HttpClient(handler))
        {
            Stream stream = await client.GetStreamAsync(uri);
            byte[] buffer = new byte[10240];

            while(true)
            {
                int byteCount = await stream.ReadAsync(buffer, 0, buffer.Length);

                if (byteCount == 0)
                {
                    // end-of-stream...must be done with the connection
                    return;
                }
                DoSomethingWithBytes(bytes, byteCount);
            }
        }
    }
}

您的 post 对您之前的 ReadBytesFromStream() 方法的作用和 DoSomethingWithBytes() 的作用含糊不清,但大概您可以弄清楚如何在上面整合该逻辑。