将两个无限的 C# IEnumerables 连接在一起,没有特定的顺序
Concatenate two infinite C# IEnumerables together in no particular order
我有两个方法,每个方法都 return 无限 IEnumerable
永无止境。我想连接它们,这样每当 IEnumerable
s return 中的任何一个值时,我都可以立即获取并处理它。
static void Main(string[] args)
{
var streamOfBoth = get1().Concat(get2());
foreach(var item in streamOfBoth)
{
Console.WriteLine(item);
// I'd expect mixed numbers 1 and 2
// Instead I receive only 1s
}
}
static IEnumerable<int> get1()
{
while (true)
{
System.Threading.Thread.Sleep(1000);
yield return 1;
}
}
static IEnumerable<int> get2()
{
while (true)
{
System.Threading.Thread.Sleep(200);
yield return 2;
}
}
有没有一种方法可以在不使用线程的情况下使用 IEnumerables 执行此操作?
您可能想要 交错 get1
和 get2
(从 get1
中获取项目,然后从 get2
中获取项目,然后再次来自 get1
,然后来自 get2
等)。广义的(IEnumerable<T>
不一定是无穷大且大小相同)Interleave<T>
扩展方法可以这样:
public static partial class EnumerableExtensions {
public static IEnumerable<T> Interleave<T>(this IEnumerable<T> source,
params IEnumerable<T>[] others) {
if (null == source)
throw new ArgumentNullException(nameof(source));
else if (null == others)
throw new ArgumentNullException(nameof(others));
IEnumerator<T>[] enums = new IEnumerator<T>[] { source.GetEnumerator() }
.Concat(others
.Where(item => item != null)
.Select(item => item.GetEnumerator()))
.ToArray();
try {
bool hasValue = true;
while (hasValue) {
hasValue = false;
for (int i = 0; i < enums.Length; ++i) {
if (enums[i] != null && enums[i].MoveNext()) {
hasValue = true;
yield return enums[i].Current;
}
else {
enums[i].Dispose();
enums[i] = null;
}
}
}
}
finally {
for (int i = enums.Length - 1; i >= 0; --i)
if (enums[i] != null)
enums[i].Dispose();
}
}
}
然后使用它:
var streamOfBoth = get1().Interleave(get2());
foreach(var item in streamOfBoth)
{
Console.WriteLine(item);
}
编辑: if
"whenever any ... return a value, I can instantly get and process"
是你问题中的关键短语你可以尝试BlockingCollection
并实施producer-consumer pattern:
static BlockingCollection<int> streamOfBoth = new BlockingCollection<int>();
// Producer #1
static void get1() {
while (true) {
System.Threading.Thread.Sleep(1000);
streamOfBoth.Add(1); // value (1) is ready and pushed into streamOfBoth
}
}
// Producer #2
static void get2() {
while (true) {
System.Threading.Thread.Sleep(200);
streamOfBoth.Add(2); // value (2) is ready and pushed into streamOfBoth
}
}
...
Task.Run(() => get1()); // Start producer #1
Task.Run(() => get2()); // Start producer #2
...
// Cosumer: when either Producer #1 or Producer #2 create a value
// consumer can starts process it
foreach(var item in streamOfBoth.GetConsumingEnumerable()) {
Console.WriteLine(item);
}
这很容易通过 System.Reactive
实现
static void Main()
{
get1().ToObservable(TaskPoolScheduler.Default).Subscribe(Print);
get2().ToObservable(TaskPoolScheduler.Default).Subscribe(Print);
}
static void Print(int i)
{
Console.WriteLine(i);
}
static IEnumerable<int> get1()
{
while (true)
{
System.Threading.Thread.Sleep(1000);
yield return 1;
}
}
static IEnumerable<int> get2()
{
while (true)
{
System.Threading.Thread.Sleep(200);
yield return 2;
}
}
这会在我的机器上产生以下输出:
2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 ...
注意 ToObservable
是用参数 TaskPoolScheduler.Default
调用的;只调用 ToObservable
而没有它会导致同步执行,这意味着它将永远枚举第一个序列而永远不会到达第二个序列。
这是一个合并 IEnumerable
的通用 Merge
方法。每个 IEnumerable
在专用线程中枚举。
using System.Reactive.Linq;
using System.Reactive.Concurrency;
public static IEnumerable<T> Merge<T>(params IEnumerable<T>[] sources)
{
IEnumerable<IObservable<T>> observables = sources
.Select(source => source.ToObservable(NewThreadScheduler.Default));
IObservable<T> merged = Observable.Merge(observables);
return merged.ToEnumerable();
}
用法示例:
var streamOfBoth = Merge(get1(), get2());
枚举结果 IEnumerable
将阻塞当前线程,直到枚举完成。
此实现取决于 System.Reactive and System.Interactive.Async 包。
我有两个方法,每个方法都 return 无限 IEnumerable
永无止境。我想连接它们,这样每当 IEnumerable
s return 中的任何一个值时,我都可以立即获取并处理它。
static void Main(string[] args)
{
var streamOfBoth = get1().Concat(get2());
foreach(var item in streamOfBoth)
{
Console.WriteLine(item);
// I'd expect mixed numbers 1 and 2
// Instead I receive only 1s
}
}
static IEnumerable<int> get1()
{
while (true)
{
System.Threading.Thread.Sleep(1000);
yield return 1;
}
}
static IEnumerable<int> get2()
{
while (true)
{
System.Threading.Thread.Sleep(200);
yield return 2;
}
}
有没有一种方法可以在不使用线程的情况下使用 IEnumerables 执行此操作?
您可能想要 交错 get1
和 get2
(从 get1
中获取项目,然后从 get2
中获取项目,然后再次来自 get1
,然后来自 get2
等)。广义的(IEnumerable<T>
不一定是无穷大且大小相同)Interleave<T>
扩展方法可以这样:
public static partial class EnumerableExtensions {
public static IEnumerable<T> Interleave<T>(this IEnumerable<T> source,
params IEnumerable<T>[] others) {
if (null == source)
throw new ArgumentNullException(nameof(source));
else if (null == others)
throw new ArgumentNullException(nameof(others));
IEnumerator<T>[] enums = new IEnumerator<T>[] { source.GetEnumerator() }
.Concat(others
.Where(item => item != null)
.Select(item => item.GetEnumerator()))
.ToArray();
try {
bool hasValue = true;
while (hasValue) {
hasValue = false;
for (int i = 0; i < enums.Length; ++i) {
if (enums[i] != null && enums[i].MoveNext()) {
hasValue = true;
yield return enums[i].Current;
}
else {
enums[i].Dispose();
enums[i] = null;
}
}
}
}
finally {
for (int i = enums.Length - 1; i >= 0; --i)
if (enums[i] != null)
enums[i].Dispose();
}
}
}
然后使用它:
var streamOfBoth = get1().Interleave(get2());
foreach(var item in streamOfBoth)
{
Console.WriteLine(item);
}
编辑: if
"whenever any ... return a value, I can instantly get and process"
是你问题中的关键短语你可以尝试BlockingCollection
并实施producer-consumer pattern:
static BlockingCollection<int> streamOfBoth = new BlockingCollection<int>();
// Producer #1
static void get1() {
while (true) {
System.Threading.Thread.Sleep(1000);
streamOfBoth.Add(1); // value (1) is ready and pushed into streamOfBoth
}
}
// Producer #2
static void get2() {
while (true) {
System.Threading.Thread.Sleep(200);
streamOfBoth.Add(2); // value (2) is ready and pushed into streamOfBoth
}
}
...
Task.Run(() => get1()); // Start producer #1
Task.Run(() => get2()); // Start producer #2
...
// Cosumer: when either Producer #1 or Producer #2 create a value
// consumer can starts process it
foreach(var item in streamOfBoth.GetConsumingEnumerable()) {
Console.WriteLine(item);
}
这很容易通过 System.Reactive
static void Main()
{
get1().ToObservable(TaskPoolScheduler.Default).Subscribe(Print);
get2().ToObservable(TaskPoolScheduler.Default).Subscribe(Print);
}
static void Print(int i)
{
Console.WriteLine(i);
}
static IEnumerable<int> get1()
{
while (true)
{
System.Threading.Thread.Sleep(1000);
yield return 1;
}
}
static IEnumerable<int> get2()
{
while (true)
{
System.Threading.Thread.Sleep(200);
yield return 2;
}
}
这会在我的机器上产生以下输出:
2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 ...
注意 ToObservable
是用参数 TaskPoolScheduler.Default
调用的;只调用 ToObservable
而没有它会导致同步执行,这意味着它将永远枚举第一个序列而永远不会到达第二个序列。
这是一个合并 IEnumerable
的通用 Merge
方法。每个 IEnumerable
在专用线程中枚举。
using System.Reactive.Linq;
using System.Reactive.Concurrency;
public static IEnumerable<T> Merge<T>(params IEnumerable<T>[] sources)
{
IEnumerable<IObservable<T>> observables = sources
.Select(source => source.ToObservable(NewThreadScheduler.Default));
IObservable<T> merged = Observable.Merge(observables);
return merged.ToEnumerable();
}
用法示例:
var streamOfBoth = Merge(get1(), get2());
枚举结果 IEnumerable
将阻塞当前线程,直到枚举完成。
此实现取决于 System.Reactive and System.Interactive.Async 包。