将热源 1 与冷源 2 合并
Merge hot source1 with a cold source2
source1 发出 A、B、C、D 等但从未完成
source2 发出 1,2 并完成
我想合并到 A1、B2、C1、D2 等
更新
我最初的尝试是按照 Theodor
的建议 Zip
和 Repeat
但这会创建一个锁,因为 source2 生成很昂贵。
Enigmativity
的最后一条评论解决了该问题
source1.Zip(source2.ToEnumerable().ToArray().Repeat())
因为您想无限期地重复 source2
并且您说它很冷(从某种意义上说,它每次都产生相同的一组值,并且通常以相同的节奏)并且它是昂贵,我们想把 IObservable<T>
变成 T[]
以确保它只计算一次。
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));
假设理想的弹珠图是这样的:
Source1: +--------A-------B-------C--------D-------|
Source2: +----1--------------2--------|
Merged: +--------A1---------B2-------C1---D2------|
这是一个具有这种行为的 ZipWithRepeated
运算符:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return second.Replay(replayed => first.ToAsyncEnumerable()
.Zip(replayed.ToAsyncEnumerable().Repeat())
.ToObservable());
}
用法示例:
var merged = source1.ZipWithRepeated(source2);
此解决方案需要依赖于 System.Linq.Async and System.Interactive.Async 包,因为两个序列在压缩之前都被转换为 IAsyncEnumerable<T>
s。
Alternative:与其依赖 Rx Replay
运算符来缓冲 source2 序列,更有效的解决方案是在转换后进行缓冲从可观察到异步可枚举。 AFAICS 在官方 Rx/Ix 库中没有对 replaying/memoizing IAsyncEnumerable<T>
s 的内置支持,但是创建带有嵌入式缓冲的自定义 Repeat
运算符并不是很困难。下面是 ZipWithRepeated
运算符的替代实现,它基于这个想法:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return first.ToAsyncEnumerable()
.Zip(second.ToAsyncEnumerable().RepeatBuffered())
.ToObservable();
}
private async static IAsyncEnumerable<TSource> RepeatBuffered<TSource>(
this IAsyncEnumerable<TSource> source,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List<TSource>();
await foreach (var item in source
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item); yield return item;
}
while (true) foreach (var item in buffer) yield return item;
}
此实现不依赖于 System.Interactive.Async 包,而仅依赖于 System.Linq.Async 包。
source1 发出 A、B、C、D 等但从未完成
source2 发出 1,2 并完成
我想合并到 A1、B2、C1、D2 等
更新
我最初的尝试是按照 Theodor
的建议 Zip
和 Repeat
但这会创建一个锁,因为 source2 生成很昂贵。
Enigmativity
的最后一条评论解决了该问题
source1.Zip(source2.ToEnumerable().ToArray().Repeat())
因为您想无限期地重复 source2
并且您说它很冷(从某种意义上说,它每次都产生相同的一组值,并且通常以相同的节奏)并且它是昂贵,我们想把 IObservable<T>
变成 T[]
以确保它只计算一次。
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));
假设理想的弹珠图是这样的:
Source1: +--------A-------B-------C--------D-------|
Source2: +----1--------------2--------|
Merged: +--------A1---------B2-------C1---D2------|
这是一个具有这种行为的 ZipWithRepeated
运算符:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return second.Replay(replayed => first.ToAsyncEnumerable()
.Zip(replayed.ToAsyncEnumerable().Repeat())
.ToObservable());
}
用法示例:
var merged = source1.ZipWithRepeated(source2);
此解决方案需要依赖于 System.Linq.Async and System.Interactive.Async 包,因为两个序列在压缩之前都被转换为 IAsyncEnumerable<T>
s。
Alternative:与其依赖 Rx Replay
运算符来缓冲 source2 序列,更有效的解决方案是在转换后进行缓冲从可观察到异步可枚举。 AFAICS 在官方 Rx/Ix 库中没有对 replaying/memoizing IAsyncEnumerable<T>
s 的内置支持,但是创建带有嵌入式缓冲的自定义 Repeat
运算符并不是很困难。下面是 ZipWithRepeated
运算符的替代实现,它基于这个想法:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return first.ToAsyncEnumerable()
.Zip(second.ToAsyncEnumerable().RepeatBuffered())
.ToObservable();
}
private async static IAsyncEnumerable<TSource> RepeatBuffered<TSource>(
this IAsyncEnumerable<TSource> source,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List<TSource>();
await foreach (var item in source
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item); yield return item;
}
while (true) foreach (var item in buffer) yield return item;
}
此实现不依赖于 System.Interactive.Async 包,而仅依赖于 System.Linq.Async 包。