在一次传递中多次使用 IEnumerable

Consuming an IEnumerable multiple times in one pass

是否可以编写一个高阶函数,使 IEnumerable 被多次使用,但只通过一次,而不将所有数据读入内存? [请参阅下面的编辑以了解我正在寻找的内容。]

例如,在下面的代码中,可枚举项是 mynums(我在其上标记了 .Trace() 以查看我们枚举了多少次)。目标是弄清楚它是否有任何大于 5 的数字,以及所有数字的总和。处理可枚举两次的函数是 Both_TwoPass,但它枚举了两次。相比之下 Both_NonStream 只枚举它一次,但以将其读入内存为代价。原则上,如 Any5Sum 所示,可以单次通过并以流方式执行这两项任务,但这是特定的解决方案。是否可以编写一个与 Both_* 具有相同签名的函数,但这是两全其美的方法?

(在我看来,这应该可以使用线程。是否有更好的解决方案,比如使用 async?)

编辑

下面是关于我正在寻找的内容的说明。我所做的是在方括号中包含对每个 属性 的非常脚踏实地的描述。

我正在寻找具有以下特征的函数Both

  1. 它有签名 (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>)(并产生 "right" 输出!)
  2. 它只迭代第一个参数 tt 一次。 [我的意思是,当传递 mynums(定义如下)时,它只输出 mynums: 0 1 2 ... 一次。这排除了函数 Both_TwoPass.]
  3. 它以流方式处理来自第一个参数 tt 的数据。 [我的意思是,例如,没有足够的内存来同时在内存中存储来自 tt 的所有项目,从而排除函数 Both_NonStream。]
using System;
using System.Collections.Generic;
using System.Linq;

namespace ConsoleApp
{
    static class Extensions
    {
        public static IEnumerable<T> Trace<T>(this IEnumerable<T> tt, string msg = "")
        {
            Console.Write(msg);
            try
            {
                foreach (T t in tt)
                {
                    Console.Write(" {0}", t);
                    yield return t;
                }
            }
            finally
            {
                Console.WriteLine('.');
            }
        }

        public static (S1, S2) Both_TwoPass<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
        {
            return (f1(tt), f2(tt));
        }

        public static (S1, S2) Both_NonStream<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
        {
            var tt2 = tt.ToList();
            return (f1(tt2), f2(tt2));
        }

        public static (bool, int) Any5Sum(this IEnumerable<int> ii)
        {
            int sum = 0;
            bool any5 = false;
            foreach (int i in ii)
            {
                sum += i;
                any5 |= i > 5; // or: if (!any5) any5 = i > 5;
            }
            return (any5, sum);
        }

    }
    class Program
    {
        static void Main()
        {
            var mynums = Enumerable.Range(0, 10).Trace("mynums:");
            Console.WriteLine("TwoPass: (any > 5, sum) = {0}", mynums.Both_TwoPass(tt => tt.Any(k => k > 5), tt => tt.Sum()));
            Console.WriteLine("NonStream: (any > 5, sum) = {0}", mynums.Both_NonStream(tt => tt.Any(k => k > 5), tt => tt.Sum()));
            Console.WriteLine("Manual: (any > 5, sum) = {0}", mynums.Any5Sum());
        }
    }
}

您编写计算模型的方式(即 return (f1(tt), f2(tt)))无法避免枚举的多次迭代。您基本上是在说计算 Item1 然后计算 Item2.

您必须将模型从 (Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>) 更改为 (Func<T, S1>, Func<T, S2>) 或更改为 Func<IEnumerable<T>, (S1, S2)> 才能运行 并行计算。

您实施的 Any5Sum 基本上是第二种方法 (Func<IEnumerable<T>, (S1, S2)>)。但是已经有一个内置的方法。

试试这个:

Console.WriteLine("Aggregate: (any > 5, sum) = {0}",
    mynums
        .Aggregate<int, (bool any5, int sum)>(
            (false, 0),
            (a, x) => (a.any5 | x > 5, a.sum + x)));

这里的核心问题是谁负责调用Enumeration.MoveNext()(例如通过使用foreach 循环)。跨线程同步多个 foreach 循环会很慢,而且很难正确处理。

实施IAsyncEnumerable<T>,让多个await foreach循环轮流处理物品会更容易。但是还是很傻

所以更简单的解决方案是更改问题。与其尝试调用同时尝试枚举项目的多个方法,不如更改界面以简单地访问每个项目。

我认为 and are describing the same thing in the comments. There is no need to create such a "special-purpose IEnumerable", though, because the BlockingCollection<> class 这种生产者-消费者场景已经存在。您将按如下方式使用它...

  • 为每个消费函数创建一个 BlockingCollection<>(即 tt1tt2)。
    • 默认情况下,BlockingCollection<> 包裹 ConcurrentQueue<>,因此元素将按 FIFO 顺序到达。
    • 为了满足一次只有一个元素保存在内存中的要求,将为 bounded capacity 指定 1。请注意,此容量是针对每个集合的,因此对于两个集合,在任何给定时刻最多会有两个排队的元素。
    • 每个集合都将保存该消费者的输入元素。
  • 为每个消费函数创建一个thread/task。
    • thread/task 将简单地为其输入集合调用 GetConsumingEnumerator(),将结果 IEnumerable<> 传递给它的消费函数,然后 return 那个结果。
      • GetConsumingEnumerable() 就像它的名字所暗示的那样:它创建一个 IEnumerable<> 消耗 (删除)集合中的元素。如果集合为空,枚举将阻塞直到添加元素。 CompleteAdding() 在生产者完成后调用,这允许使用枚举器在集合清空时退出。
  • 生产者枚举 IEnumerable<>tt,并将每个元素添加到两个集合中。这是唯一一次 tt 被枚举。
  • 一旦 tt 被完全枚举,CompleteAdding() 将在每个集合上调用。
  • 一旦每个消费者 thread/task 完成,他们的结果就会 returned。

这是代码中的样子...

public static (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> tt1, Func<IEnumerable<T>, S2> tt2)
{
    const int MaxQueuedElementsPerCollection = 1;

    using (BlockingCollection<T> collection1 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
    using (Task<S1> task1 = StartConsumerTask(collection1, tt1))
    using (BlockingCollection<T> collection2 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
    using (Task<S2> task2 = StartConsumerTask(collection2, tt2))
    {
        foreach (T element in tt)
        {
            collection1.Add(element);
            collection2.Add(element);
        }

        // Inform any enumerators created by .GetConsumingEnumerable()
        // that there will be no more elements added.
        collection1.CompleteAdding();
        collection2.CompleteAdding();

        // Accessing the Result property blocks until the Task<> is complete.
        return (task1.Result, task2.Result);
    }

    Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
    {
        return Task.Run(() => func(collection.GetConsumingEnumerable()));
    }
}

请注意,为了提高效率,您可以将 MaxQueuedElementsPerCollection 增加到 10100,这样消费者就不必 运行彼此步调一致。

不过,此代码有一个问题。当集合为空时,消费者必须等待生产者生产一个元素,而当集合已满时,生产者必须等待消费者消费一个元素。考虑在执行 tt => tt.Any(k => k > 5) lambda...

过程中发生了什么
  1. 生产者等待集合未满并添加5
  2. 消费者等待集合非空并移除5
    • 5 > 5 returns false 并且枚举继续。
  3. 生产者等待集合未满并添加6
  4. 消费者等待集合非空并移除6
    • 6 > 5 returns true 并且枚举停止。 Any()、lambda 和消费者任务全部 return。
  5. 生产者等待集合未满并添加7
  6. 生产者等待集合未满,但......从未发生过!
    • 消费者已经放弃了枚举,所以它不会消​​耗任何元素来为新元素腾出空间。 Add() 永远不会 return.

我能想到的防止这种死锁的最干净的方法是确保枚举整个集合,即使 func 不这样做也是如此。这只需要对 StartConsumerTask<>() local method...

进行简单更改
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
    return Task.Run(
        () => {
            try
            {
                return func(collection.GetConsumingEnumerable());
            }
            finally
            {
                // Prevent BlockingCollection<>.Add() calls from
                // deadlocking by ensuring the entire collection gets
                // consumed even if func abandoned its enumeration early.
                foreach (T element in collection.GetConsumingEnumerable())
                {
                    // Do nothing...
                }
            }
        }
    );
}

这样做的缺点是 tt 将始终枚举完成,即使 both tt1tt2 放弃了他们的枚举器早。

解决了这个问题,这...

static void Main()
{
    IEnumerable<int> mynums = Enumerable.Range(0, 10).Trace("mynums:");

    Console.WriteLine("Both: (any > 5, sum) = {0}", mynums.Both(tt => tt.Any(k => k > 5), tt => tt.Sum()));
}

...输出这个...

mynums: 0 1 2 3 4 5 6 7 8 9.
Both: (any > 5, sum) = (True, 45)

我相信有可能满足问题的所有要求,还有一个(非常自然的)要求,即如果两个 Func<IEnumerable<T>, S> 中的每一个都部分消耗原始可枚举,则只能部分枚举原始可枚举. (@BACON 对此进行了讨论)。该方法在我的 GitHub repo 'CoEnumerable'. The idea is that the Barrier 中进行了更详细的讨论 class 提供了一种相当简单的方法来实现代理 IEnumerable,每个 Func<IEnumerable<T>, S> 都可以在代理时使用该代理只消耗真实的 IEnumerable 一次。特别是,该实现只消耗了绝对必要的原始可枚举量(即满足上述额外要求)。

代理是:

class BarrierEnumerable<T> : IEnumerable<T>
{
    private Barrier barrier;
    private bool moveNext;
    private readonly Func<T> src;

    public BarrierEnumerable(IEnumerator<T> enumerator)
    {
        src = () => enumerator.Current;
    }

    public Barrier Barrier
    {
        set => barrier = value;
    }

    public bool MoveNext
    {
        set => moveNext = value;
    }

    public IEnumerator<T> GetEnumerator()
    {
        try
        {
            while (true)
            {
                barrier.SignalAndWait();
                if (moveNext)
                {
                    yield return src();
                }
                else
                {
                    yield break;
                }
            }
        }
        finally
        {
            barrier.RemoveParticipant();
        }
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

据此我们可以合并两个消费者

public static T Combine<S, T1, T2, T>(this IEnumerable<S> source,
    Func<IEnumerable<S>, T1> coenumerable1,
    Func<IEnumerable<S>, T2> coenumerable2,
    Func<T1, T2, T> resultSelector)
{
    using var ss = source.GetEnumerator();
    var enumerable1 = new BarrierEnumerable<S>(ss);
    var enumerable2 = new BarrierEnumerable<S>(ss);
    using var barrier = new Barrier(2, _ => enumerable1.MoveNext = enumerable2.MoveNext = ss.MoveNext());
    enumerable2.Barrier = enumerable1.Barrier = barrier;
    using var t1 = Task.Run(() => coenumerable1(enumerable1));
    using var t2 = Task.Run(() => coenumerable2(enumerable2));
    return resultSelector(t1.Result, t2.Result);
}

GitHub 存储库有几个使用上述代码的示例,以及一些简短的设计讨论(包括限制)。