循环推送值列表的 Observable

Observable that cyclicly pushes a list of values

我想创建一个 Observable,它每隔 t 秒不断推送一个值列表。

例如,给定 {1, 2, 3, 4} 个订阅者应该得到这个:

1,2,3,4,1,2,3,4,1,2,3,4,1,2...

class Program
{
    static void Main()
    {
        var observable = Observable
            .Interval(TimeSpan.FromSeconds(3))
            .Zip(Observable.Range(1, 4)
            .Repeat(), (_, count) => count);

        observable.Subscribe(Console.WriteLine);

        Console.WriteLine("Finished!");
    }
}

我删除了我建议的答案,因为 Timothy 和 Lee 的答案都使用 built-in Rx 函数并且更加优雅。不过,我会留下对问题的解释,因为我认为它很有用:

Observables 应该是推送序列,并且 Zip 将在等待来自第二个的值与下一个配对时从更快的生产流中排队项目。由于 Obsevable.Range returns 这些值与订阅者可以处理的一样快,这会填满您的所有内存并阻塞线程。

在 System.Reactive.Concurrency 命名空间中,有一些 类 和方法可以帮助进行调度。

以下代码是每 3 秒打印一次整数数组内容的粗略示例:

var numbers = new int[] { 1,2,3,4 };

var scs = new SynchronizationContextScheduler(new SynchronizationContext());
scs.SchedulePeriodic<int[]>(numbers, TimeSpan.FromSeconds(3), (n) => 
{
    foreach (var number in n)
    {
        Console.Write(number + " ");
    }
});

Console.ReadLine();

我不确定这是否是您要查找的内容,但希望对您有所帮助。

Charles Mager 的回答已经解决了您当前代码无法正常工作的原因,并提出了一种修复方法。这是我能想到的最简单的修复方法:

    var observable = Observable.Zip(
        Observable.Interval(TimeSpan.FromSeconds(3)),
        Enumerable.Range(1, 4).Repeat(),
        (_, count) => count);

This is just using the version of Zip that combines an IObservable and an IEnumerable.

Repeat扩展方法定义如下(同Charles Mager的回答):

public static IEnumerable<T> Repeat(this IEnumerable<T> source)
{
    while (true)
    {
        foreach (var item in source)
        {
            yield return item;
        }
    }
}

这对我来说似乎很简单 mis-placed .Repeat()

class Program
{
    static void Main()
    {
        var observable = Observable
            .Interval(TimeSpan.FromSeconds(3))
            .Zip(Observable.Range(1, 4), (_, count) => count)
            .Repeat();

        observable.Subscribe(Console.WriteLine);

        Console.WriteLine("Finished!");
        Console.ReadLine();
    }
}

现在将:

  • 不阻止控制台完成
  • 不抛出 OutOfMemoryException。

注意,没有使用 .Do(),没有自定义扩展方法,没有无限产生阻塞线程的 IEnumerables ;-)

...还有一个没有 Zip 的替代实现,希望外行开发人员可以阅读和理解(也可以处理!):

class Program
{
    static void Main()
    {
        var observable = Observable
            .Interval(TimeSpan.FromSeconds(3))
            //.Zip(Observable.Range(1, 4), (_, count) => count)
            .Select(i=>i+1)
            .Take(4)
            .Repeat();

        using (observable.Subscribe(Console.WriteLine))
        {
            Console.WriteLine("Running...");
            Console.ReadLine();
        }
        Console.WriteLine("Finished!");
    }
}