使用 Rx 为 webservice 调用创建一个轮询请求

Using Rx create a polling request for webservice call

在 C# 中使用 Rx 我正在尝试创建对 REST API 的轮询请求。我面临的问题是,Observable 需要按顺序发送响应。意味着如果请求 A 在 X 时间进行并且请求 B 在 X + dx 时间进行并且 B 的响应在 A 之前进行,则 Observable 表达式应该忽略或取消请求 A.

我写了一个示例代码来描述这个场景。我该如何修复它以仅获取最新响应并取消或忽略以前的响应。

 class Program
    {
        static int i = 0;

        static void Main(string[] args)
        {
            GenerateObservableSequence();

            Console.ReadLine();
        }

        private static void GenerateObservableSequence()
        {
            var timerData = Observable.Timer(TimeSpan.Zero,
                TimeSpan.FromSeconds(1));

            var asyncCall = Observable.FromAsync<int>(() =>
            {
                TaskCompletionSource<int> t = new TaskCompletionSource<int>();
                i++;

                int k = i;
                var rndNo = new Random().Next(3, 10);
                Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
                return t.Task;
            });

            var obs = from t in timerData
            from data in asyncCall
            select data;

            var hot = obs.Publish();
            hot.Connect();

                hot.Subscribe(j => 
            {
                Console.WriteLine("{0}", j);
            });
        }
    }

@Enigmativity 回答后:添加了 Polling Aync 功能以始终获取最新响应:

 public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
        {
            return Observable
         .Create<T>(o =>
         {
             var z = 0L;
             return
                 Observable
                     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
                     .SelectMany(nr =>
                         Observable.FromAsync<T>(AsyncCall),
                         (nr, obj) => new { nr, obj})
                     .Do(res => z = Math.Max(z, res.nr))
                     .Where(res => res.nr >= z)
                     .Select(res => res.obj)
                     .Subscribe(o);
         });

    }

这是一种常见的情况,可以简单地解决。

您的示例代码的关键部分是

var obs = from t in timerData
          from data in asyncCall
          select data;

这可以读作"for each value in timerData get all the values in asyncCall"。这是 SelectMany(或 FlatMap)运算符。 SelectMany 运算符将从内部序列 (asyncCall) 中获取所有值,并 return 它们收到的值。这意味着您可以获得乱序值。

你想要的是当外部序列(timerData)产生新值时取消之前的内部序列。为此,我们想改用 Switch 运算符。

var obs = timerData.Select(_=>asyncCall)
                   .Switch();

完整的代码可以清理到以下内容。 (删除了多余的 Publish/Connect,按键时取消订阅)

class 程序 { 静态整数 i = 0;

    static void Main(string[] args)
    {
        using (GenerateObservableSequence().Subscribe(x => Console.WriteLine(x)))
        {
            Console.ReadLine();
        }
    }

    private static IObservable<int> GenerateObservableSequence()
    {
        var timerData = Observable.Timer(TimeSpan.Zero,
            TimeSpan.FromSeconds(1));

        var asyncCall = Observable.FromAsync<int>(() =>
        {
            TaskCompletionSource<int> t = new TaskCompletionSource<int>();
            i++;

            int k = i;
            var rndNo = new Random().Next(3, 10);
            Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
            return t.Task;
        });

        return from t in timerData
               from data in asyncCall
               select data;
    }
}

--编辑--

看来我误解了问题。而@Enigmativity 提供了更准确的答案。这是对他的回答的清理。

//Probably should be a field?
var rnd = new Random();
var obs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
        //.Select(n => new { n, r = ++i })
        //No need for the `i` counter. Rx does this for us with this overload of `Select`
        .Select((val, idx) => new { Value = val, Index = idx})
        .SelectMany(nr =>
            Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
            (nr, _) => nr)
        //.Do(nr => z = Math.Max(z, nr.n))
        //.Where(nr => nr.n >= z)
        //Replace external State and Do with scan and Distinct
        .Scan(new { Value = 0L, Index = -1 }, (prev, cur) => {
            return cur.Index > prev.Index
                ? cur
                : prev;
        })
        .DistinctUntilChanged()
        .Select(nr => nr.Value)
        .Dump();

让我们从简化代码开始。

这基本上是相同的代码:

var rnd = new Random();

var i = 0;

var obs =
    from n in Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
    let r = ++i
    from t in Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10)))
    select r;

obs.Subscribe(Console.WriteLine);

我得到这样的结果:

2
1
3
4
8
5
11
6
9
7
10

或者,这可以写成:

var obs =
    Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
        .Select(n => ++i)
        .SelectMany(n =>
            Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (n, _) => n);

那么,现在满足您的要求:

If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.

代码如下:

var rnd = new Random();

var i = 0;
var z = 0L;

var obs =
    Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
        .Select(n => new { n, r = ++i })
        .SelectMany(nr =>
            Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (nr, _) => nr)
        .Do(nr => z = Math.Max(z, nr.n))
        .Where(nr => nr.n >= z)
        .Select(nr => nr.r);

我不喜欢那样使用 .Do,但我还想不出替代方案。

这给了这种东西:

1
5
8
9
10
11
14
15
16
17
22

请注意,这些值只是递增的。

现在,您真的应该使用 Observable.Create 来封装您正在使用的状态。所以你的最终观察结果应该是这样的:

var obs =
    Observable
        .Create<int>(o =>
        {
            var rnd = new Random();
            var i = 0;
            var z = 0L;
            return
                Observable
                    .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                    .Select(n => new { n, r = ++i })
                    .SelectMany(nr =>
                        Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
                        (nr, _) => nr)
                    .Do(nr => z = Math.Max(z, nr.n))
                    .Where(nr => nr.n >= z)
                    .Select(nr => nr.r)
                    .Subscribe(o);
        });