如何在 C# 中从长期存在的 HTTP 连接接收实时数据?

How do I receive live data from a long lived HTTP connection in C#?

我希望在 发生 时,从我正在与 集成的服务器中处理 long-lived HTTP connection 的结果。此服务器 returns 一行 JSON(\n 分隔)每个 "event" 我希望处理。

给定一个 Stream 的实例,该实例分配给表示来自打开的 HTTP 连接的字节的变量 changeStream,这里是我正在做的事情的一个提取示例:

requestWebRequest 的实例,配置为打开与我正在集成的服务器的连接。)

var response = request.GetResponse();
var changeStream = response.GetResponseStream();

var lineByLine = Observable.Using(
    () => new StreamReader(changeStream),
    (streamReader) => streamReader.ReadLineAsync().ToObservable()
);

lineByLine.Subscribe((string line) =>
{
    System.Console.WriteLine($"JSON! ---------=> {line}");
});

使用上面的代码,最终发生的是我收到服务器发送的第一行,然后是 none。既不是来自初始响应,也不是实时生成的新响应 activity.

  • 就我的问题而言,假设此连接将无限期保持打开状态。
  • 如何让 system.reactive 在遇到它们时为每一行触发回调?

Please note: This scenario is not a candidate for switching to SignalR

您的尝试只会观察到一次 ReadLineAsync 调用。相反,您需要 return 每行。大概是这样的;

Observable.Create<string>(async o => {
    var response = await request.GetResponseAsync();
    var changeStream = response.GetResponseStream();
    using var streamReader = new StreamReader(changeStream);
    while (true)
    {
        var line = await streamReader.ReadLineAsync();
        if (line == null)
            break;
        o.OnNext(line);
    }
    o.OnCompleted();
});

尽管这看起来更复杂,但最好使用内置运算符来完成这项工作。

IObservable<string> query =
    Observable
        .FromAsync(() => request.GetResponseAsync())
        .SelectMany(
            r => Observable.Using(
                () => r,
                r2 => Observable.Using(
                    () => r2.GetResponseStream(),
                    rs => Observable.Using(
                        () => new StreamReader(rs),
                        sr =>
                            Observable
                                .Defer(() => Observable.Start(() => sr.ReadLine()))
                                .Repeat()
                                .TakeWhile(w => w != null)))));

未经测试,但应该接近。