如何使用Rx.Net实现状态轮询?
how to implement status polling using Rx.Net?
我已经玩了 2 天了,现在尝试使用 .Net 的响应式扩展来拥抱响应式编程的领域。
我构建了一个状态轮询用例,假设有一个虚拟网络 API 和一个轮询状态对象的反应式客户端。
我尝试了以下代码:
// Creates an observable that ticks each 1 second
var ticksObservable = Observable.Interval(TimeSpan.FromMilliseconds(1000));
// Creates a new observable transforming each tick to a string status requested from the api
var coldStatusPollerObservable = ticksObservable.Select(tick =>
{
Console.WriteLine("Sending Request");
var tsk = client.GetStatus(1); // Http get request to a web api resource (id == 1 just for demo)
tsk.Wait();
return tsk.Result;
}
);
// Subscribe and print results on console
coldStatusPollerObservable.Subscribe(
status => Console.WriteLine(status), ex => Console.WriteLine(ex.Message)
);
一切都很好,我得到了预期的输出:
{"status":"waiting"}
{"status":"running"}
{"status":"running"}
{"status":"running"}
{"status":"ok"}
然后我添加了另一个约束,它是从网络返回的随机错误请求 API。
发生的问题是我无法正确处理异常。
异常发生在 tsk.wait() 中,我期望它只会触发我传递给 Subscribe 方法的 onError 操作( ex => Console.WriteLine(ex.Message) )
Q1:在这种情况下处理异常的正确方法是什么?
Q2:是否有使用 Rx.NET 进行轮询的更简洁的实现?
PS: 我正在使用 Rx.NET 3.1.1
您基本上希望尽可能避免使用任何 .Wait()
阻塞调用。 Rx 带有一个专为处理任务而设计的运算符 - Observable.FromAsync
所以您的基本查询现在变为:
var coldStatusPollerObservable =
from tick in ticksObservable
from status in Observable.FromAsync(() => client.GetStatus(1))
select status;
如果您想要控制台消息,请执行以下操作:
var coldStatusPollerObservable =
from tick in ticksObservable
from status in Observable.FromAsync(() =>
{
Console.WriteLine("Sending Request");
return client.GetStatus(1);
})
select status;
请记住尽可能坚持使用内置运算符。
你可以这样处理异常:
void Main()
{
var coldStatusPollerObservable =
from tick in Observable.Interval(TimeSpan.FromMilliseconds(1000))
from status in
Observable
.FromAsync(() => client.GetStatus(1))
.Catch<string, Exception>(ex => Observable.Return("Error"))
select status;
coldStatusPollerObservable.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
}
public static class client
{
private static int _counter = 0;
public static Task<string> GetStatus(int id)
{
if (_counter++ == 5)
throw new Exception();
return Task.Run(() => _counter.ToString());
}
}
这给出:
1
2
3
4
5
Error
7
8
9
10
11
12
13
14
15
...
我已经玩了 2 天了,现在尝试使用 .Net 的响应式扩展来拥抱响应式编程的领域。
我构建了一个状态轮询用例,假设有一个虚拟网络 API 和一个轮询状态对象的反应式客户端。
我尝试了以下代码:
// Creates an observable that ticks each 1 second
var ticksObservable = Observable.Interval(TimeSpan.FromMilliseconds(1000));
// Creates a new observable transforming each tick to a string status requested from the api
var coldStatusPollerObservable = ticksObservable.Select(tick =>
{
Console.WriteLine("Sending Request");
var tsk = client.GetStatus(1); // Http get request to a web api resource (id == 1 just for demo)
tsk.Wait();
return tsk.Result;
}
);
// Subscribe and print results on console
coldStatusPollerObservable.Subscribe(
status => Console.WriteLine(status), ex => Console.WriteLine(ex.Message)
);
一切都很好,我得到了预期的输出:
{"status":"waiting"}
{"status":"running"}
{"status":"running"}
{"status":"running"}
{"status":"ok"}
然后我添加了另一个约束,它是从网络返回的随机错误请求 API。 发生的问题是我无法正确处理异常。 异常发生在 tsk.wait() 中,我期望它只会触发我传递给 Subscribe 方法的 onError 操作( ex => Console.WriteLine(ex.Message) )
Q1:在这种情况下处理异常的正确方法是什么? Q2:是否有使用 Rx.NET 进行轮询的更简洁的实现?
PS: 我正在使用 Rx.NET 3.1.1
您基本上希望尽可能避免使用任何 .Wait()
阻塞调用。 Rx 带有一个专为处理任务而设计的运算符 - Observable.FromAsync
所以您的基本查询现在变为:
var coldStatusPollerObservable =
from tick in ticksObservable
from status in Observable.FromAsync(() => client.GetStatus(1))
select status;
如果您想要控制台消息,请执行以下操作:
var coldStatusPollerObservable =
from tick in ticksObservable
from status in Observable.FromAsync(() =>
{
Console.WriteLine("Sending Request");
return client.GetStatus(1);
})
select status;
请记住尽可能坚持使用内置运算符。
你可以这样处理异常:
void Main()
{
var coldStatusPollerObservable =
from tick in Observable.Interval(TimeSpan.FromMilliseconds(1000))
from status in
Observable
.FromAsync(() => client.GetStatus(1))
.Catch<string, Exception>(ex => Observable.Return("Error"))
select status;
coldStatusPollerObservable.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
}
public static class client
{
private static int _counter = 0;
public static Task<string> GetStatus(int id)
{
if (_counter++ == 5)
throw new Exception();
return Task.Run(() => _counter.ToString());
}
}
这给出:
1 2 3 4 5 Error 7 8 9 10 11 12 13 14 15 ...