将两个无限的 C# IEnumerables 连接在一起,没有特定的顺序

Concatenate two infinite C# IEnumerables together in no particular order

我有两个方法,每个方法都 return 无限 IEnumerable 永无止境。我想连接它们,这样每当 IEnumerables return 中的任何一个值时,我都可以立即获取并处理它。

        static void Main(string[] args)
        {
            var streamOfBoth = get1().Concat(get2());
            foreach(var item in streamOfBoth)
            {
                Console.WriteLine(item);
               // I'd expect mixed numbers 1 and 2
               // Instead I receive only 1s
            }
         }

        static IEnumerable<int> get1()
        {
            while (true)
            {
                System.Threading.Thread.Sleep(1000);
                yield return 1;
            }
        }

        static IEnumerable<int> get2()
        {
            while (true)
            {
                System.Threading.Thread.Sleep(200);
                yield return 2;
            }
        }

有没有一种方法可以在不使用线程的情况下使用 IEnumerables 执行此操作?

您可能想要 交错 get1get2(从 get1 中获取项目,然后从 get2 中获取项目,然后再次来自 get1,然后来自 get2 等)。广义的(IEnumerable<T>不一定是无穷大且大小相同)Interleave<T>扩展方法可以这样:

   public static partial class EnumerableExtensions {
     public static IEnumerable<T> Interleave<T>(this IEnumerable<T> source, 
                                                params IEnumerable<T>[] others) {
      if (null == source)
        throw new ArgumentNullException(nameof(source));
      else if (null == others)
        throw new ArgumentNullException(nameof(others));

      IEnumerator<T>[] enums = new IEnumerator<T>[] { source.GetEnumerator() }
          .Concat(others
          .Where(item => item != null)
          .Select(item => item.GetEnumerator()))
        .ToArray();

      try {
        bool hasValue = true;

        while (hasValue) {
          hasValue = false;

          for (int i = 0; i < enums.Length; ++i) {
            if (enums[i] != null && enums[i].MoveNext()) {
              hasValue = true;

              yield return enums[i].Current;
            }
            else {
              enums[i].Dispose();
              enums[i] = null;
            }
          }
        }
      }
      finally {
        for (int i = enums.Length - 1; i >= 0; --i)
          if (enums[i] != null)
            enums[i].Dispose();
      }
    }
  }

然后使用它:

   var streamOfBoth = get1().Interleave(get2());

   foreach(var item in streamOfBoth)
   {
       Console.WriteLine(item);
   }

编辑: if

"whenever any ... return a value, I can instantly get and process"

是你问题中的关键短语你可以尝试BlockingCollection并实施producer-consumer pattern:

static BlockingCollection<int> streamOfBoth = new BlockingCollection<int>();

// Producer #1 
static void get1() {
  while (true) {
    System.Threading.Thread.Sleep(1000);

    streamOfBoth.Add(1); // value (1) is ready and pushed into streamOfBoth
  }
}

// Producer #2
static void get2() {
  while (true) {
    System.Threading.Thread.Sleep(200);

    streamOfBoth.Add(2); // value (2) is ready and pushed into streamOfBoth
  }
}

...

Task.Run(() => get1()); // Start producer #1
Task.Run(() => get2()); // Start producer #2

...

// Cosumer: when either Producer #1 or Producer #2 create a value
// consumer can starts process it 
foreach(var item in streamOfBoth.GetConsumingEnumerable()) {
  Console.WriteLine(item);
}

这很容易通过 System.Reactive

实现
static void Main()
{
    get1().ToObservable(TaskPoolScheduler.Default).Subscribe(Print);
    get2().ToObservable(TaskPoolScheduler.Default).Subscribe(Print);
}

static void Print(int i)
{
    Console.WriteLine(i);
}

static IEnumerable<int> get1()
{
    while (true)
    {
        System.Threading.Thread.Sleep(1000);
        yield return 1;
    }
}

static IEnumerable<int> get2()
{
    while (true)
    {
        System.Threading.Thread.Sleep(200);
        yield return 2;
    }
}

这会在我的机器上产生以下输出:

2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 ...

注意 ToObservable 是用参数 TaskPoolScheduler.Default 调用的;只调用 ToObservable 而没有它会导致同步执行,这意味着它将永远枚举第一个序列而永远不会到达第二个序列。

这是一个合并 IEnumerable 的通用 Merge 方法。每个 IEnumerable 在专用线程中枚举。

using System.Reactive.Linq;
using System.Reactive.Concurrency;

public static IEnumerable<T> Merge<T>(params IEnumerable<T>[] sources)
{
    IEnumerable<IObservable<T>> observables = sources
        .Select(source => source.ToObservable(NewThreadScheduler.Default));
    IObservable<T> merged = Observable.Merge(observables);
    return merged.ToEnumerable();
}

用法示例:

var streamOfBoth = Merge(get1(), get2());

枚举结果 IEnumerable 将阻塞当前线程,直到枚举完成。

此实现取决于 System.Reactive and System.Interactive.Async 包。