在一次传递中多次使用 IEnumerable
Consuming an IEnumerable multiple times in one pass
是否可以编写一个高阶函数,使 IEnumerable
被多次使用,但只通过一次,而不将所有数据读入内存? [请参阅下面的编辑以了解我正在寻找的内容。]
例如,在下面的代码中,可枚举项是 mynums
(我在其上标记了 .Trace()
以查看我们枚举了多少次)。目标是弄清楚它是否有任何大于 5 的数字,以及所有数字的总和。处理可枚举两次的函数是 Both_TwoPass
,但它枚举了两次。相比之下 Both_NonStream
只枚举它一次,但以将其读入内存为代价。原则上,如 Any5Sum
所示,可以单次通过并以流方式执行这两项任务,但这是特定的解决方案。是否可以编写一个与 Both_*
具有相同签名的函数,但这是两全其美的方法?
(在我看来,这应该可以使用线程。是否有更好的解决方案,比如使用 async
?)
编辑
下面是关于我正在寻找的内容的说明。我所做的是在方括号中包含对每个 属性 的非常脚踏实地的描述。
我正在寻找具有以下特征的函数Both
:
- 它有签名
(S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>)
(并产生 "right" 输出!)
- 它只迭代第一个参数
tt
一次。 [我的意思是,当传递 mynums
(定义如下)时,它只输出 mynums: 0 1 2 ...
一次。这排除了函数 Both_TwoPass
.]
- 它以流方式处理来自第一个参数
tt
的数据。 [我的意思是,例如,没有足够的内存来同时在内存中存储来自 tt
的所有项目,从而排除函数 Both_NonStream
。]
using System;
using System.Collections.Generic;
using System.Linq;
namespace ConsoleApp
{
static class Extensions
{
public static IEnumerable<T> Trace<T>(this IEnumerable<T> tt, string msg = "")
{
Console.Write(msg);
try
{
foreach (T t in tt)
{
Console.Write(" {0}", t);
yield return t;
}
}
finally
{
Console.WriteLine('.');
}
}
public static (S1, S2) Both_TwoPass<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
{
return (f1(tt), f2(tt));
}
public static (S1, S2) Both_NonStream<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
{
var tt2 = tt.ToList();
return (f1(tt2), f2(tt2));
}
public static (bool, int) Any5Sum(this IEnumerable<int> ii)
{
int sum = 0;
bool any5 = false;
foreach (int i in ii)
{
sum += i;
any5 |= i > 5; // or: if (!any5) any5 = i > 5;
}
return (any5, sum);
}
}
class Program
{
static void Main()
{
var mynums = Enumerable.Range(0, 10).Trace("mynums:");
Console.WriteLine("TwoPass: (any > 5, sum) = {0}", mynums.Both_TwoPass(tt => tt.Any(k => k > 5), tt => tt.Sum()));
Console.WriteLine("NonStream: (any > 5, sum) = {0}", mynums.Both_NonStream(tt => tt.Any(k => k > 5), tt => tt.Sum()));
Console.WriteLine("Manual: (any > 5, sum) = {0}", mynums.Any5Sum());
}
}
}
您编写计算模型的方式(即 return (f1(tt), f2(tt))
)无法避免枚举的多次迭代。您基本上是在说计算 Item1
然后计算 Item2
.
您必须将模型从 (Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>)
更改为 (Func<T, S1>, Func<T, S2>)
或更改为 Func<IEnumerable<T>, (S1, S2)>
才能运行 并行计算。
您实施的 Any5Sum
基本上是第二种方法 (Func<IEnumerable<T>, (S1, S2)>
)。但是已经有一个内置的方法。
试试这个:
Console.WriteLine("Aggregate: (any > 5, sum) = {0}",
mynums
.Aggregate<int, (bool any5, int sum)>(
(false, 0),
(a, x) => (a.any5 | x > 5, a.sum + x)));
这里的核心问题是谁负责调用Enumeration.MoveNext()
(例如通过使用foreach 循环)。跨线程同步多个 foreach 循环会很慢,而且很难正确处理。
实施IAsyncEnumerable<T>
,让多个await foreach
循环轮流处理物品会更容易。但是还是很傻
所以更简单的解决方案是更改问题。与其尝试调用同时尝试枚举项目的多个方法,不如更改界面以简单地访问每个项目。
我认为 and are describing the same thing in the comments. There is no need to create such a "special-purpose IEnumerable
", though, because the BlockingCollection<>
class 这种生产者-消费者场景已经存在。您将按如下方式使用它...
- 为每个消费函数创建一个
BlockingCollection<>
(即 tt1
和 tt2
)。
- 默认情况下,
BlockingCollection<>
包裹 ConcurrentQueue<>
,因此元素将按 FIFO 顺序到达。
- 为了满足一次只有一个元素保存在内存中的要求,将为 bounded capacity 指定
1
。请注意,此容量是针对每个集合的,因此对于两个集合,在任何给定时刻最多会有两个排队的元素。
- 每个集合都将保存该消费者的输入元素。
- 为每个消费函数创建一个thread/task。
- thread/task 将简单地为其输入集合调用
GetConsumingEnumerator()
,将结果 IEnumerable<>
传递给它的消费函数,然后 return 那个结果。
GetConsumingEnumerable()
就像它的名字所暗示的那样:它创建一个 IEnumerable<>
, 消耗 (删除)集合中的元素。如果集合为空,枚举将阻塞直到添加元素。 CompleteAdding()
在生产者完成后调用,这允许使用枚举器在集合清空时退出。
- 生产者枚举
IEnumerable<>
、tt
,并将每个元素添加到两个集合中。这是唯一一次 tt
被枚举。
如果集合已达到其容量,BlockingCollection<>.Add()
将阻塞,防止整个 tt
被缓冲在内存中。
- 一旦
tt
被完全枚举,CompleteAdding()
将在每个集合上调用。
- 一旦每个消费者 thread/task 完成,他们的结果就会 returned。
这是代码中的样子...
public static (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> tt1, Func<IEnumerable<T>, S2> tt2)
{
const int MaxQueuedElementsPerCollection = 1;
using (BlockingCollection<T> collection1 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
using (Task<S1> task1 = StartConsumerTask(collection1, tt1))
using (BlockingCollection<T> collection2 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
using (Task<S2> task2 = StartConsumerTask(collection2, tt2))
{
foreach (T element in tt)
{
collection1.Add(element);
collection2.Add(element);
}
// Inform any enumerators created by .GetConsumingEnumerable()
// that there will be no more elements added.
collection1.CompleteAdding();
collection2.CompleteAdding();
// Accessing the Result property blocks until the Task<> is complete.
return (task1.Result, task2.Result);
}
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
return Task.Run(() => func(collection.GetConsumingEnumerable()));
}
}
请注意,为了提高效率,您可以将 MaxQueuedElementsPerCollection
增加到 10
或 100
,这样消费者就不必 运行彼此步调一致。
不过,此代码有一个问题。当集合为空时,消费者必须等待生产者生产一个元素,而当集合已满时,生产者必须等待消费者消费一个元素。考虑在执行 tt => tt.Any(k => k > 5)
lambda...
过程中发生了什么
- 生产者等待集合未满并添加
5
。
- 消费者等待集合非空并移除
5
。
5 > 5
returns false
并且枚举继续。
- 生产者等待集合未满并添加
6
。
- 消费者等待集合非空并移除
6
。
6 > 5
returns true
并且枚举停止。 Any()
、lambda 和消费者任务全部 return。
- 生产者等待集合未满并添加
7
。
- 生产者等待集合未满,但......从未发生过!
- 消费者已经放弃了枚举,所以它不会消耗任何元素来为新元素腾出空间。
Add()
永远不会 return.
我能想到的防止这种死锁的最干净的方法是确保枚举整个集合,即使 func
不这样做也是如此。这只需要对 StartConsumerTask<>()
local method...
进行简单更改
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
return Task.Run(
() => {
try
{
return func(collection.GetConsumingEnumerable());
}
finally
{
// Prevent BlockingCollection<>.Add() calls from
// deadlocking by ensuring the entire collection gets
// consumed even if func abandoned its enumeration early.
foreach (T element in collection.GetConsumingEnumerable())
{
// Do nothing...
}
}
}
);
}
这样做的缺点是 tt
将始终枚举完成,即使 both tt1
和 tt2
放弃了他们的枚举器早。
解决了这个问题,这...
static void Main()
{
IEnumerable<int> mynums = Enumerable.Range(0, 10).Trace("mynums:");
Console.WriteLine("Both: (any > 5, sum) = {0}", mynums.Both(tt => tt.Any(k => k > 5), tt => tt.Sum()));
}
...输出这个...
mynums: 0 1 2 3 4 5 6 7 8 9.
Both: (any > 5, sum) = (True, 45)
我相信有可能满足问题的所有要求,还有一个(非常自然的)要求,即如果两个 Func<IEnumerable<T>, S>
中的每一个都部分消耗原始可枚举,则只能部分枚举原始可枚举.
(@BACON 对此进行了讨论)。该方法在我的 GitHub repo 'CoEnumerable'. The idea is that the Barrier 中进行了更详细的讨论 class 提供了一种相当简单的方法来实现代理 IEnumerable
,每个 Func<IEnumerable<T>, S>
都可以在代理时使用该代理只消耗真实的 IEnumerable
一次。特别是,该实现只消耗了绝对必要的原始可枚举量(即满足上述额外要求)。
代理是:
class BarrierEnumerable<T> : IEnumerable<T>
{
private Barrier barrier;
private bool moveNext;
private readonly Func<T> src;
public BarrierEnumerable(IEnumerator<T> enumerator)
{
src = () => enumerator.Current;
}
public Barrier Barrier
{
set => barrier = value;
}
public bool MoveNext
{
set => moveNext = value;
}
public IEnumerator<T> GetEnumerator()
{
try
{
while (true)
{
barrier.SignalAndWait();
if (moveNext)
{
yield return src();
}
else
{
yield break;
}
}
}
finally
{
barrier.RemoveParticipant();
}
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
据此我们可以合并两个消费者
public static T Combine<S, T1, T2, T>(this IEnumerable<S> source,
Func<IEnumerable<S>, T1> coenumerable1,
Func<IEnumerable<S>, T2> coenumerable2,
Func<T1, T2, T> resultSelector)
{
using var ss = source.GetEnumerator();
var enumerable1 = new BarrierEnumerable<S>(ss);
var enumerable2 = new BarrierEnumerable<S>(ss);
using var barrier = new Barrier(2, _ => enumerable1.MoveNext = enumerable2.MoveNext = ss.MoveNext());
enumerable2.Barrier = enumerable1.Barrier = barrier;
using var t1 = Task.Run(() => coenumerable1(enumerable1));
using var t2 = Task.Run(() => coenumerable2(enumerable2));
return resultSelector(t1.Result, t2.Result);
}
GitHub 存储库有几个使用上述代码的示例,以及一些简短的设计讨论(包括限制)。
是否可以编写一个高阶函数,使 IEnumerable
被多次使用,但只通过一次,而不将所有数据读入内存? [请参阅下面的编辑以了解我正在寻找的内容。]
例如,在下面的代码中,可枚举项是 mynums
(我在其上标记了 .Trace()
以查看我们枚举了多少次)。目标是弄清楚它是否有任何大于 5 的数字,以及所有数字的总和。处理可枚举两次的函数是 Both_TwoPass
,但它枚举了两次。相比之下 Both_NonStream
只枚举它一次,但以将其读入内存为代价。原则上,如 Any5Sum
所示,可以单次通过并以流方式执行这两项任务,但这是特定的解决方案。是否可以编写一个与 Both_*
具有相同签名的函数,但这是两全其美的方法?
(在我看来,这应该可以使用线程。是否有更好的解决方案,比如使用 async
?)
编辑
下面是关于我正在寻找的内容的说明。我所做的是在方括号中包含对每个 属性 的非常脚踏实地的描述。
我正在寻找具有以下特征的函数Both
:
- 它有签名
(S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>)
(并产生 "right" 输出!) - 它只迭代第一个参数
tt
一次。 [我的意思是,当传递mynums
(定义如下)时,它只输出mynums: 0 1 2 ...
一次。这排除了函数Both_TwoPass
.] - 它以流方式处理来自第一个参数
tt
的数据。 [我的意思是,例如,没有足够的内存来同时在内存中存储来自tt
的所有项目,从而排除函数Both_NonStream
。]
using System;
using System.Collections.Generic;
using System.Linq;
namespace ConsoleApp
{
static class Extensions
{
public static IEnumerable<T> Trace<T>(this IEnumerable<T> tt, string msg = "")
{
Console.Write(msg);
try
{
foreach (T t in tt)
{
Console.Write(" {0}", t);
yield return t;
}
}
finally
{
Console.WriteLine('.');
}
}
public static (S1, S2) Both_TwoPass<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
{
return (f1(tt), f2(tt));
}
public static (S1, S2) Both_NonStream<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
{
var tt2 = tt.ToList();
return (f1(tt2), f2(tt2));
}
public static (bool, int) Any5Sum(this IEnumerable<int> ii)
{
int sum = 0;
bool any5 = false;
foreach (int i in ii)
{
sum += i;
any5 |= i > 5; // or: if (!any5) any5 = i > 5;
}
return (any5, sum);
}
}
class Program
{
static void Main()
{
var mynums = Enumerable.Range(0, 10).Trace("mynums:");
Console.WriteLine("TwoPass: (any > 5, sum) = {0}", mynums.Both_TwoPass(tt => tt.Any(k => k > 5), tt => tt.Sum()));
Console.WriteLine("NonStream: (any > 5, sum) = {0}", mynums.Both_NonStream(tt => tt.Any(k => k > 5), tt => tt.Sum()));
Console.WriteLine("Manual: (any > 5, sum) = {0}", mynums.Any5Sum());
}
}
}
您编写计算模型的方式(即 return (f1(tt), f2(tt))
)无法避免枚举的多次迭代。您基本上是在说计算 Item1
然后计算 Item2
.
您必须将模型从 (Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>)
更改为 (Func<T, S1>, Func<T, S2>)
或更改为 Func<IEnumerable<T>, (S1, S2)>
才能运行 并行计算。
您实施的 Any5Sum
基本上是第二种方法 (Func<IEnumerable<T>, (S1, S2)>
)。但是已经有一个内置的方法。
试试这个:
Console.WriteLine("Aggregate: (any > 5, sum) = {0}",
mynums
.Aggregate<int, (bool any5, int sum)>(
(false, 0),
(a, x) => (a.any5 | x > 5, a.sum + x)));
这里的核心问题是谁负责调用Enumeration.MoveNext()
(例如通过使用foreach 循环)。跨线程同步多个 foreach 循环会很慢,而且很难正确处理。
实施IAsyncEnumerable<T>
,让多个await foreach
循环轮流处理物品会更容易。但是还是很傻
所以更简单的解决方案是更改问题。与其尝试调用同时尝试枚举项目的多个方法,不如更改界面以简单地访问每个项目。
我认为 IEnumerable
", though, because the BlockingCollection<>
class 这种生产者-消费者场景已经存在。您将按如下方式使用它...
- 为每个消费函数创建一个
BlockingCollection<>
(即tt1
和tt2
)。- 默认情况下,
BlockingCollection<>
包裹ConcurrentQueue<>
,因此元素将按 FIFO 顺序到达。 - 为了满足一次只有一个元素保存在内存中的要求,将为 bounded capacity 指定
1
。请注意,此容量是针对每个集合的,因此对于两个集合,在任何给定时刻最多会有两个排队的元素。 - 每个集合都将保存该消费者的输入元素。
- 默认情况下,
- 为每个消费函数创建一个thread/task。
- thread/task 将简单地为其输入集合调用
GetConsumingEnumerator()
,将结果IEnumerable<>
传递给它的消费函数,然后 return 那个结果。GetConsumingEnumerable()
就像它的名字所暗示的那样:它创建一个IEnumerable<>
, 消耗 (删除)集合中的元素。如果集合为空,枚举将阻塞直到添加元素。CompleteAdding()
在生产者完成后调用,这允许使用枚举器在集合清空时退出。
- thread/task 将简单地为其输入集合调用
- 生产者枚举
IEnumerable<>
、tt
,并将每个元素添加到两个集合中。这是唯一一次tt
被枚举。-
如果集合已达到其容量,
BlockingCollection<>.Add()
将阻塞,防止整个tt
被缓冲在内存中。
- 一旦
tt
被完全枚举,CompleteAdding()
将在每个集合上调用。 - 一旦每个消费者 thread/task 完成,他们的结果就会 returned。
这是代码中的样子...
public static (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> tt1, Func<IEnumerable<T>, S2> tt2)
{
const int MaxQueuedElementsPerCollection = 1;
using (BlockingCollection<T> collection1 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
using (Task<S1> task1 = StartConsumerTask(collection1, tt1))
using (BlockingCollection<T> collection2 = new BlockingCollection<T>(MaxQueuedElementsPerCollection))
using (Task<S2> task2 = StartConsumerTask(collection2, tt2))
{
foreach (T element in tt)
{
collection1.Add(element);
collection2.Add(element);
}
// Inform any enumerators created by .GetConsumingEnumerable()
// that there will be no more elements added.
collection1.CompleteAdding();
collection2.CompleteAdding();
// Accessing the Result property blocks until the Task<> is complete.
return (task1.Result, task2.Result);
}
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
return Task.Run(() => func(collection.GetConsumingEnumerable()));
}
}
请注意,为了提高效率,您可以将 MaxQueuedElementsPerCollection
增加到 10
或 100
,这样消费者就不必 运行彼此步调一致。
不过,此代码有一个问题。当集合为空时,消费者必须等待生产者生产一个元素,而当集合已满时,生产者必须等待消费者消费一个元素。考虑在执行 tt => tt.Any(k => k > 5)
lambda...
- 生产者等待集合未满并添加
5
。 - 消费者等待集合非空并移除
5
。5 > 5
returnsfalse
并且枚举继续。
- 生产者等待集合未满并添加
6
。 - 消费者等待集合非空并移除
6
。6 > 5
returnstrue
并且枚举停止。Any()
、lambda 和消费者任务全部 return。
- 生产者等待集合未满并添加
7
。 - 生产者等待集合未满,但......从未发生过!
- 消费者已经放弃了枚举,所以它不会消耗任何元素来为新元素腾出空间。
Add()
永远不会 return.
- 消费者已经放弃了枚举,所以它不会消耗任何元素来为新元素腾出空间。
我能想到的防止这种死锁的最干净的方法是确保枚举整个集合,即使 func
不这样做也是如此。这只需要对 StartConsumerTask<>()
local method...
Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func)
{
return Task.Run(
() => {
try
{
return func(collection.GetConsumingEnumerable());
}
finally
{
// Prevent BlockingCollection<>.Add() calls from
// deadlocking by ensuring the entire collection gets
// consumed even if func abandoned its enumeration early.
foreach (T element in collection.GetConsumingEnumerable())
{
// Do nothing...
}
}
}
);
}
这样做的缺点是 tt
将始终枚举完成,即使 both tt1
和 tt2
放弃了他们的枚举器早。
解决了这个问题,这...
static void Main()
{
IEnumerable<int> mynums = Enumerable.Range(0, 10).Trace("mynums:");
Console.WriteLine("Both: (any > 5, sum) = {0}", mynums.Both(tt => tt.Any(k => k > 5), tt => tt.Sum()));
}
...输出这个...
mynums: 0 1 2 3 4 5 6 7 8 9.
Both: (any > 5, sum) = (True, 45)
我相信有可能满足问题的所有要求,还有一个(非常自然的)要求,即如果两个 Func<IEnumerable<T>, S>
中的每一个都部分消耗原始可枚举,则只能部分枚举原始可枚举.
(@BACON 对此进行了讨论)。该方法在我的 GitHub repo 'CoEnumerable'. The idea is that the Barrier 中进行了更详细的讨论 class 提供了一种相当简单的方法来实现代理 IEnumerable
,每个 Func<IEnumerable<T>, S>
都可以在代理时使用该代理只消耗真实的 IEnumerable
一次。特别是,该实现只消耗了绝对必要的原始可枚举量(即满足上述额外要求)。
代理是:
class BarrierEnumerable<T> : IEnumerable<T>
{
private Barrier barrier;
private bool moveNext;
private readonly Func<T> src;
public BarrierEnumerable(IEnumerator<T> enumerator)
{
src = () => enumerator.Current;
}
public Barrier Barrier
{
set => barrier = value;
}
public bool MoveNext
{
set => moveNext = value;
}
public IEnumerator<T> GetEnumerator()
{
try
{
while (true)
{
barrier.SignalAndWait();
if (moveNext)
{
yield return src();
}
else
{
yield break;
}
}
}
finally
{
barrier.RemoveParticipant();
}
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
据此我们可以合并两个消费者
public static T Combine<S, T1, T2, T>(this IEnumerable<S> source,
Func<IEnumerable<S>, T1> coenumerable1,
Func<IEnumerable<S>, T2> coenumerable2,
Func<T1, T2, T> resultSelector)
{
using var ss = source.GetEnumerator();
var enumerable1 = new BarrierEnumerable<S>(ss);
var enumerable2 = new BarrierEnumerable<S>(ss);
using var barrier = new Barrier(2, _ => enumerable1.MoveNext = enumerable2.MoveNext = ss.MoveNext());
enumerable2.Barrier = enumerable1.Barrier = barrier;
using var t1 = Task.Run(() => coenumerable1(enumerable1));
using var t2 = Task.Run(() => coenumerable2(enumerable2));
return resultSelector(t1.Result, t2.Result);
}
GitHub 存储库有几个使用上述代码的示例,以及一些简短的设计讨论(包括限制)。