使用 Rx 为 webservice 调用创建一个轮询请求
Using Rx create a polling request for webservice call
在 C# 中使用 Rx 我正在尝试创建对 REST API 的轮询请求。我面临的问题是,Observable 需要按顺序发送响应。意味着如果请求 A 在 X 时间进行并且请求 B 在 X + dx 时间进行并且 B 的响应在 A 之前进行,则 Observable 表达式应该忽略或取消请求 A.
我写了一个示例代码来描述这个场景。我该如何修复它以仅获取最新响应并取消或忽略以前的响应。
class Program
{
static int i = 0;
static void Main(string[] args)
{
GenerateObservableSequence();
Console.ReadLine();
}
private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
var obs = from t in timerData
from data in asyncCall
select data;
var hot = obs.Publish();
hot.Connect();
hot.Subscribe(j =>
{
Console.WriteLine("{0}", j);
});
}
}
@Enigmativity 回答后:添加了 Polling Aync 功能以始终获取最新响应:
public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
{
return Observable
.Create<T>(o =>
{
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
.SelectMany(nr =>
Observable.FromAsync<T>(AsyncCall),
(nr, obj) => new { nr, obj})
.Do(res => z = Math.Max(z, res.nr))
.Where(res => res.nr >= z)
.Select(res => res.obj)
.Subscribe(o);
});
}
这是一种常见的情况,可以简单地解决。
您的示例代码的关键部分是
var obs = from t in timerData
from data in asyncCall
select data;
这可以读作"for each value in timerData
get all the values in asyncCall
"。这是 SelectMany
(或 FlatMap
)运算符。 SelectMany
运算符将从内部序列 (asyncCall
) 中获取所有值,并 return 它们收到的值。这意味着您可以获得乱序值。
你想要的是当外部序列(timerData
)产生新值时取消之前的内部序列。为此,我们想改用 Switch
运算符。
var obs = timerData.Select(_=>asyncCall)
.Switch();
完整的代码可以清理到以下内容。 (删除了多余的 Publish/Connect,按键时取消订阅)
class 程序
{
静态整数 i = 0;
static void Main(string[] args)
{
using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x)))
{
Console.ReadLine();
}
}
private static IObservable<int> GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
return from t in timerData
from data in asyncCall
select data;
}
}
--编辑--
看来我误解了问题。而@Enigmativity 提供了更准确的答案。这是对他的回答的清理。
//Probably should be a field?
var rnd = new Random();
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
//.Select(n => new { n, r = ++i })
//No need for the `i` counter. Rx does this for us with this overload of `Select`
.Select((val, idx) => new { Value = val, Index = idx})
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
//.Do(nr => z = Math.Max(z, nr.n))
//.Where(nr => nr.n >= z)
//Replace external State and Do with scan and Distinct
.Scan(new { Value = 0L, Index = -1 }, (prev, cur) => {
return cur.Index > prev.Index
? cur
: prev;
})
.DistinctUntilChanged()
.Select(nr => nr.Value)
.Dump();
让我们从简化代码开始。
这基本上是相同的代码:
var rnd = new Random();
var i = 0;
var obs =
from n in Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
let r = ++i
from t in Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10)))
select r;
obs.Subscribe(Console.WriteLine);
我得到这样的结果:
2
1
3
4
8
5
11
6
9
7
10
或者,这可以写成:
var obs =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => ++i)
.SelectMany(n =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (n, _) => n);
那么,现在满足您的要求:
If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.
代码如下:
var rnd = new Random();
var i = 0;
var z = 0L;
var obs =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => new { n, r = ++i })
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (nr, _) => nr)
.Do(nr => z = Math.Max(z, nr.n))
.Where(nr => nr.n >= z)
.Select(nr => nr.r);
我不喜欢那样使用 .Do
,但我还想不出替代方案。
这给了这种东西:
1
5
8
9
10
11
14
15
16
17
22
请注意,这些值只是递增的。
现在,您真的应该使用 Observable.Create
来封装您正在使用的状态。所以你的最终观察结果应该是这样的:
var obs =
Observable
.Create<int>(o =>
{
var rnd = new Random();
var i = 0;
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => new { n, r = ++i })
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
.Do(nr => z = Math.Max(z, nr.n))
.Where(nr => nr.n >= z)
.Select(nr => nr.r)
.Subscribe(o);
});
在 C# 中使用 Rx 我正在尝试创建对 REST API 的轮询请求。我面临的问题是,Observable 需要按顺序发送响应。意味着如果请求 A 在 X 时间进行并且请求 B 在 X + dx 时间进行并且 B 的响应在 A 之前进行,则 Observable 表达式应该忽略或取消请求 A.
我写了一个示例代码来描述这个场景。我该如何修复它以仅获取最新响应并取消或忽略以前的响应。
class Program
{
static int i = 0;
static void Main(string[] args)
{
GenerateObservableSequence();
Console.ReadLine();
}
private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
var obs = from t in timerData
from data in asyncCall
select data;
var hot = obs.Publish();
hot.Connect();
hot.Subscribe(j =>
{
Console.WriteLine("{0}", j);
});
}
}
@Enigmativity 回答后:添加了 Polling Aync 功能以始终获取最新响应:
public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
{
return Observable
.Create<T>(o =>
{
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
.SelectMany(nr =>
Observable.FromAsync<T>(AsyncCall),
(nr, obj) => new { nr, obj})
.Do(res => z = Math.Max(z, res.nr))
.Where(res => res.nr >= z)
.Select(res => res.obj)
.Subscribe(o);
});
}
这是一种常见的情况,可以简单地解决。
您的示例代码的关键部分是
var obs = from t in timerData
from data in asyncCall
select data;
这可以读作"for each value in timerData
get all the values in asyncCall
"。这是 SelectMany
(或 FlatMap
)运算符。 SelectMany
运算符将从内部序列 (asyncCall
) 中获取所有值,并 return 它们收到的值。这意味着您可以获得乱序值。
你想要的是当外部序列(timerData
)产生新值时取消之前的内部序列。为此,我们想改用 Switch
运算符。
var obs = timerData.Select(_=>asyncCall)
.Switch();
完整的代码可以清理到以下内容。 (删除了多余的 Publish/Connect,按键时取消订阅)
class 程序 { 静态整数 i = 0;
static void Main(string[] args)
{
using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x)))
{
Console.ReadLine();
}
}
private static IObservable<int> GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
return from t in timerData
from data in asyncCall
select data;
}
}
--编辑--
看来我误解了问题。而@Enigmativity 提供了更准确的答案。这是对他的回答的清理。
//Probably should be a field?
var rnd = new Random();
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
//.Select(n => new { n, r = ++i })
//No need for the `i` counter. Rx does this for us with this overload of `Select`
.Select((val, idx) => new { Value = val, Index = idx})
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
//.Do(nr => z = Math.Max(z, nr.n))
//.Where(nr => nr.n >= z)
//Replace external State and Do with scan and Distinct
.Scan(new { Value = 0L, Index = -1 }, (prev, cur) => {
return cur.Index > prev.Index
? cur
: prev;
})
.DistinctUntilChanged()
.Select(nr => nr.Value)
.Dump();
让我们从简化代码开始。
这基本上是相同的代码:
var rnd = new Random();
var i = 0;
var obs =
from n in Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
let r = ++i
from t in Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10)))
select r;
obs.Subscribe(Console.WriteLine);
我得到这样的结果:
2 1 3 4 8 5 11 6 9 7 10
或者,这可以写成:
var obs =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => ++i)
.SelectMany(n =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (n, _) => n);
那么,现在满足您的要求:
If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.
代码如下:
var rnd = new Random();
var i = 0;
var z = 0L;
var obs =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => new { n, r = ++i })
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (nr, _) => nr)
.Do(nr => z = Math.Max(z, nr.n))
.Where(nr => nr.n >= z)
.Select(nr => nr.r);
我不喜欢那样使用 .Do
,但我还想不出替代方案。
这给了这种东西:
1 5 8 9 10 11 14 15 16 17 22
请注意,这些值只是递增的。
现在,您真的应该使用 Observable.Create
来封装您正在使用的状态。所以你的最终观察结果应该是这样的:
var obs =
Observable
.Create<int>(o =>
{
var rnd = new Random();
var i = 0;
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => new { n, r = ++i })
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
.Do(nr => z = Math.Max(z, nr.n))
.Where(nr => nr.n >= z)
.Select(nr => nr.r)
.Subscribe(o);
});