如何在没有缓冲的情况下使用单个枚举检查 IEnumerable 的多个条件?

How to check an IEnumerable for multiple conditions with a single enumeration without buffering?

我有一个很长的序列数据是IEnumerable的形式,我想根据一些条件来检查它。每个条件 returns 一个值为 true 或 false,我想知道是否所有条件都为真。我的问题是我无法通过调用 ToList 来实现 IEnumerable,因为它太长了(> 10,000,000,000 个元素)。我也不能多次枚举序列,每个条件一个,因为每次我都会得到不同的序列。我正在寻找一种执行此检查的有效方法,尽可能使用现有的 LINQ 功能。


澄清:我要求的是通用解决方案,而不是针对下面给出的特定示例问题的解决方案。


这是我的序列的虚拟版本:

static IEnumerable<int> GetLongSequence()
{
    var random = new Random();
    for (long i = 0; i < 10_000_000_000; i++) yield return random.Next(0, 100_000_000);
}

下面是序列必须满足的条件示例:

var source = GetLongSequence();
var result = source.Any(n => n % 28_413_803 == 0)
    && source.All(n => n < 99_999_999)
    && source.Average(n => n) > 50_000_001;

不幸的是,这种方法调用了 3 次 GetLongSequence,因此它不满足问题的要求。

我试着写了一个上面的Linqy扩展方法,希望这能给我一些想法:

public static bool AllConditions<TSource>(this IEnumerable<TSource> source,
    params Func<IEnumerable<TSource>, bool>[] conditions)
{
    foreach (var condition in conditions)
    {
        if (!condition(source)) return false;
    }
    return true;
}

这就是我打算如何使用它:

var result = source.AllConditions
(
    s => s.Any(n => n % 28_413_803 == 0),
    s => s.All(n => n < 99_999_999),
    s => s.Average(n => n) > 50_000_001,
    // more conditions...
);

不幸的是,这没有任何改进。 GetLongSequence 再次被调用了 3 次。

我用头撞墙一个小时,没有任何进展,我想出了一个可能的解决方案。我可以 运行 每个条件在一个单独的线程中,并同步它们对序列的单个共享枚举器的访问。所以我最终遇到了这个怪物:

public static bool AllConditions<TSource>(this IEnumerable<TSource> source,
    params Func<IEnumerable<TSource>, bool>[] conditions)
{
    var locker = new object();
    var enumerator = source.GetEnumerator();
    var barrier = new Barrier(conditions.Length);
    long index = -1;
    bool finished = false;

    IEnumerable<TSource> OneByOne()
    {
        try
        {
            while (true)
            {
                TSource current;
                lock (locker)
                {
                    if (finished) break;
                    if (barrier.CurrentPhaseNumber > index)
                    {
                        index = barrier.CurrentPhaseNumber;
                        finished = !enumerator.MoveNext();
                        if (finished)
                        {
                            enumerator.Dispose(); break;
                        }
                    }
                    current = enumerator.Current;
                }
                yield return current;
                barrier.SignalAndWait();
            }
        }
        finally
        {
            barrier.RemoveParticipant();
        }
    }

    var results = new ConcurrentQueue<bool>();
    var threads = conditions.Select(condition => new Thread(() =>
    {
        var result = condition(OneByOne());
        results.Enqueue(result);
    })
    { IsBackground = true }).ToArray();
    foreach (var thread in threads) thread.Start();
    foreach (var thread in threads) thread.Join();
    return results.All(r => r);
}

为了同步 a 使用了 Barrier。这个解决方案实际上比我想象的要好得多。它可以在我的机器上每秒处理近 1,000,000 个元素。但是它还不够快,因为它需要将近 3 个小时来处理 10,000,000,000 个元素的完整序列。我等不了超过 5 分钟的结果。关于如何在单个线程中有效地 运行 这些条件有什么想法吗?

如果您只想在一个枚举中在单个线程上检查这三个条件,我不会使用 LINQ 并手动聚合检查:

bool anyVerified = false;
bool allVerified = true;
double averageSoFar = 0;

foreach (int n in GetLongSequence()) {
    anyVerified = anyVerified || n % 28_413_803 == 0;
    allVerified = allVerified && n < 99_999_999;
    averageSoFar += n / 10_000_000_000;
    // Early out conditions here...
}
return anyVerified && allVerified && averageSoFar > 50_000_001;

如果您打算经常进行这些检查,这可以变得更通用,但看起来它可以满足您的所有要求。

如果需要保证序列只被枚举一次,对整个序列进行条件操作是没有用的。 我想到的一种可能性是为序列的每个元素调用一个接口,并根据您的特定条件以不同的方式实现该接口:

bool Example()
{
    var source = GetLongSequence();

    var conditions = new List<IEvaluate<int>>
    {
        new Any<int>(n => n % 28_413_803 == 0),
        new All<int>(n => n < 99_999_999),
        new Average(d => d > 50_000_001)
    };

    foreach (var item in source)
    {
        foreach (var condition in conditions)
        {
            condition.Evaluate(item);
        }
    }

    return conditions.All(c => c.Result);   
}

static IEnumerable<int> GetLongSequence()
{
    var random = new Random();
    for (long i = 0; i < 10_000_000_000; i++) yield return random.Next(0, 100_000_000);
}

interface IEvaluate<T>
{
    void Evaluate(T item);
    bool Result { get; }
}

class Any<T> : IEvaluate<T>
{
    private bool _result;
    private readonly Func<T, bool> _predicate;

    public Any(Func<T, bool> predicate)
    {
        _predicate = predicate;
        _result = false;
    }

    public void Evaluate(T item)
    {
        if (_predicate(item))
        {
            _result = true;
        }
    }

    public bool Result => _result;
}


class All<T> : IEvaluate<T>
{
    private bool _result;
    private readonly Func<T, bool> _predicate;

    public All(Func<T, bool> predicate)
    {
        _predicate = predicate;
        _result = true;
    }

    public void Evaluate(T item)
    {
        if (!_predicate(item))
        {
            _result = false;
        }
    }

    public bool Result => _result;
}

class Average : IEvaluate<int>
{
    private long _sum;
    private int _count;
    Func<double, bool> _evaluate;
    public Average(Func<double, bool> evaluate)
    {
    }

    public void Evaluate(int item)
    {
        _sum += item;
        _count++;
    }

    public bool Result => _evaluate((double)_sum / _count);
}

我还可以向您推荐另一种基于 Enumerable.Aggregate LINQ 扩展方法的方法吗?

public static class Parsing {
    public static bool ParseOnceAndCheck(this IEnumerable<int> collection, Func<int, bool> all, Func<int, bool> any, Func<double, bool> average) {
        // Aggregate the two boolean results, the sum of all values and the count of values...
        (bool allVerified, bool anyVerified, int sum, int count) = collection.Aggregate(
            ValueTuple.Create(true, false, 0, 0),
            (tuple, item) => ValueTuple.Create(tuple.Item1 && all(item), tuple.Item2 || any(item), tuple.Item3 + item, tuple.Item4 + 1)
        );
        // ... and summarizes the result
        return allVerified && anyVerified && average(sum / count);
    }
}

您可以使用与通常的 LINQ 方法非常相似的方式调用此扩展方法,但您的序列只有一个枚举:

IEnumerable<int> sequence = GetLongSequence();
bool result = sequence.ParseOnceAndCheck(
    all: n => n < 99_999_999,
    any: n => n % 28_413_803 == 0,
    average: a => a > 50_000_001
);

我找到了一个使用 Reactive Extensions library. On the one hand it's an excellent solution regarding features and ease of use, since all methods that are available in LINQ for IEnumerable are also available in RX for IObservable 的单线程解决方案。另一方面,它的性能有点令人失望,因为它和我在问题中提出的古怪的多线程解决方案一样慢。


更新: 我放弃了前两个实现(一个使用方法 Replay, the other using the method Publish) with a new one that uses the class Subject. This class is a low-level combination of an IObservable and IObserver. I am posting to it the items of the source IEnumerable, which are then propagated to all the IObservable<bool>'s provided by the caller. The performance is now decent, only 40% slower than Klaus Gütter's )。另外,如果条件(如 All)在枚举结束前可以判断为false。

public static bool AllConditions<TSource>(this IEnumerable<TSource> source,
    params Func<IObservable<TSource>, IObservable<bool>>[] conditions)
{
    var subject = new Subject<TSource>();
    var result = true;
    foreach (var condition in conditions)
    {
        condition(subject).SingleAsync().Subscribe(onNext: value =>
        {
            if (value) return;
            result = false;
        });
    }
    foreach (var item in source)
    {
        if (!result) break;
        subject.OnNext(item);
    }
    return result;
}

用法示例:

var result = source.AllConditions
(
    o => o.Any(n => n % 28_413_803 == 0),
    o => o.All(n => n < 99_999_999),
    o => o.Average(n => n).Select(v => v > 50_000_001)
);

每个条件都应该 return 一个包含单个布尔值的 IObservable。这不是 RX API 强制执行的,所以我使用 System.Reactive.Linq.SingleAsync 方法在运行时强制执行它(如果结果不符合此合同,则抛出异常)。