如何在 C# 中从长期存在的 HTTP 连接接收实时数据?
How do I receive live data from a long lived HTTP connection in C#?
我希望在 发生 时,从我正在与 集成的服务器中处理 long-lived HTTP connection 的结果。此服务器 returns 一行 JSON(\n
分隔)每个 "event" 我希望处理。
给定一个 Stream
的实例,该实例分配给表示来自打开的 HTTP 连接的字节的变量 changeStream
,这里是我正在做的事情的一个提取示例:
(request
是 WebRequest
的实例,配置为打开与我正在集成的服务器的连接。)
var response = request.GetResponse();
var changeStream = response.GetResponseStream();
var lineByLine = Observable.Using(
() => new StreamReader(changeStream),
(streamReader) => streamReader.ReadLineAsync().ToObservable()
);
lineByLine.Subscribe((string line) =>
{
System.Console.WriteLine($"JSON! ---------=> {line}");
});
使用上面的代码,最终发生的是我收到服务器发送的第一行,然后是 none。既不是来自初始响应,也不是实时生成的新响应 activity.
- 就我的问题而言,假设此连接将无限期保持打开状态。
- 如何让 system.reactive 在遇到它们时为每一行触发回调?
Please note: This scenario is not a candidate for switching to SignalR
您的尝试只会观察到一次 ReadLineAsync
调用。相反,您需要 return 每行。大概是这样的;
Observable.Create<string>(async o => {
var response = await request.GetResponseAsync();
var changeStream = response.GetResponseStream();
using var streamReader = new StreamReader(changeStream);
while (true)
{
var line = await streamReader.ReadLineAsync();
if (line == null)
break;
o.OnNext(line);
}
o.OnCompleted();
});
尽管这看起来更复杂,但最好使用内置运算符来完成这项工作。
IObservable<string> query =
Observable
.FromAsync(() => request.GetResponseAsync())
.SelectMany(
r => Observable.Using(
() => r,
r2 => Observable.Using(
() => r2.GetResponseStream(),
rs => Observable.Using(
() => new StreamReader(rs),
sr =>
Observable
.Defer(() => Observable.Start(() => sr.ReadLine()))
.Repeat()
.TakeWhile(w => w != null)))));
未经测试,但应该接近。
我希望在 发生 时,从我正在与 集成的服务器中处理 long-lived HTTP connection 的结果。此服务器 returns 一行 JSON(\n
分隔)每个 "event" 我希望处理。
给定一个 Stream
的实例,该实例分配给表示来自打开的 HTTP 连接的字节的变量 changeStream
,这里是我正在做的事情的一个提取示例:
(request
是 WebRequest
的实例,配置为打开与我正在集成的服务器的连接。)
var response = request.GetResponse();
var changeStream = response.GetResponseStream();
var lineByLine = Observable.Using(
() => new StreamReader(changeStream),
(streamReader) => streamReader.ReadLineAsync().ToObservable()
);
lineByLine.Subscribe((string line) =>
{
System.Console.WriteLine($"JSON! ---------=> {line}");
});
使用上面的代码,最终发生的是我收到服务器发送的第一行,然后是 none。既不是来自初始响应,也不是实时生成的新响应 activity.
- 就我的问题而言,假设此连接将无限期保持打开状态。
- 如何让 system.reactive 在遇到它们时为每一行触发回调?
Please note: This scenario is not a candidate for switching to SignalR
您的尝试只会观察到一次 ReadLineAsync
调用。相反,您需要 return 每行。大概是这样的;
Observable.Create<string>(async o => {
var response = await request.GetResponseAsync();
var changeStream = response.GetResponseStream();
using var streamReader = new StreamReader(changeStream);
while (true)
{
var line = await streamReader.ReadLineAsync();
if (line == null)
break;
o.OnNext(line);
}
o.OnCompleted();
});
尽管这看起来更复杂,但最好使用内置运算符来完成这项工作。
IObservable<string> query =
Observable
.FromAsync(() => request.GetResponseAsync())
.SelectMany(
r => Observable.Using(
() => r,
r2 => Observable.Using(
() => r2.GetResponseStream(),
rs => Observable.Using(
() => new StreamReader(rs),
sr =>
Observable
.Defer(() => Observable.Start(() => sr.ReadLine()))
.Repeat()
.TakeWhile(w => w != null)))));
未经测试,但应该接近。