将热源 1 与冷源 2 合并

Merge hot source1 with a cold source2

source1 发出 A、B、C、D 等但从未完成

source2 发出 1,2 并完成

我想合并到 A1、B2、C1、D2 等

更新

我最初的尝试是按照 Theodor 的建议 ZipRepeat 但这会创建一个锁,因为 source2 生成很昂贵。

Enigmativity 的最后一条评论解决了该问题

source1.Zip(source2.ToEnumerable().ToArray().Repeat())

因为您想无限期地重复 source2 并且您说它很冷(从某种意义上说,它每次都产生相同的一组值,并且通常以相同的节奏)并且它是昂贵,我们想把 IObservable<T> 变成 T[] 以确保它只计算一次。

var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));

假设理想的弹珠图是这样的:

Source1: +--------A-------B-------C--------D-------|
Source2: +----1--------------2--------|
Merged:  +--------A1---------B2-------C1---D2------|

这是一个具有这种行为的 ZipWithRepeated 运算符:

static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second)
{
    return second.Replay(replayed => first.ToAsyncEnumerable()
        .Zip(replayed.ToAsyncEnumerable().Repeat())
        .ToObservable());
}

用法示例:

var merged = source1.ZipWithRepeated(source2);

此解决方案需要依赖于 System.Linq.Async and System.Interactive.Async 包,因为两个序列在​​压缩之前都被转换为 IAsyncEnumerable<T>s。


Alternative:与其依赖 Rx Replay 运算符来缓冲 source2 序列,更有效的解决方案是在转换后进行缓冲从可观察到异步可枚举。 AFAICS 在官方 Rx/Ix 库中没有对 replaying/memoizing IAsyncEnumerable<T>s 的内置支持,但是创建带有嵌入式缓冲的自定义 Repeat 运算符并不是很困难。下面是 ZipWithRepeated 运算符的替代实现,它基于这个想法:

static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second)
{
    return first.ToAsyncEnumerable()
        .Zip(second.ToAsyncEnumerable().RepeatBuffered())
        .ToObservable();
}

private async static IAsyncEnumerable<TSource> RepeatBuffered<TSource>(
    this IAsyncEnumerable<TSource> source,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var buffer = new List<TSource>();
    await foreach (var item in source
        .WithCancellation(cancellationToken).ConfigureAwait(false))
    {
        buffer.Add(item); yield return item;
    }
    while (true) foreach (var item in buffer) yield return item;
}

此实现不依赖于 System.Interactive.Async 包,而仅依赖于 System.Linq.Async 包。