如何合并一个嵌套的 observable IObservable<IObservable<T>> 并发受限,缓冲容量受限?
How to merge a nested observable IObservable<IObservable<T>> with limited concurrency and limited buffer capacity?
我注意到 Rx Merge
运算符接受一个可选的 maxConcurrent
参数。这可用于通过同时订阅有限数量的子序列来限制最大并发性。当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但当新子序列的推送速度比订阅子序列的完成速度快时,它就会出现问题。发生的情况是子序列被缓冲在一个内部缓冲区中,其大小不断增加,而且当前订阅的子序列变得越来越旧。下面是这个问题的演示:
await Observable
.Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
.Select(_ => Observable
.Return(DateTime.Now)
.Do(d => Console.WriteLine(
$"Then: {d:HH:mm:ss.fff}, " +
$"Now: {DateTime.Now:HH:mm:ss.fff}, " +
$"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
.Delay(TimeSpan.FromMilliseconds(1000)))
.Merge(maxConcurrent: 1)
.Take(10);
每10毫秒推送一个新的子序列,每个子序列在1000毫秒后完成。子序列以最大并发1合并(顺序)。
输出:
Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes
内存使用量稳步增长,每个子序列的创建和订阅之间的时间间隔也在增长。
我想要的是一个自定义 Merge
变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入的子序列都应该导致当前最旧的缓冲子序列被丢弃。这是理想行为的大理石图,配置为最大并发 = 1 和缓冲区容量 = 1:
Source: +----A------B------C------|
A: +-------a----a---|
B: not-subscribed
C: +-----c----|
Result: +------------a----a---------c----|
- 子序列 A 在发出后立即被订阅。
- 然后 B 被发出并存储在缓冲区中,因为 A 还没有完成。
- 然后C被发出并替换了缓冲区中的B。结果,B 子序列被丢弃并且从未被订阅。
- 子序列 A 完成后立即订阅缓冲的子序列 C。
- 最终结果包含 A 和 C 子序列发出的合并值。
如何实现具有这种特定行为的自定义 Rx 运算符?这是我要实现的运算符的存根:
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return source.Merge(maximumConcurrency);
// TODO: enforce the boundedCapacity policy somehow
}
我想出了一个实用的解决方案,我不确定它是否可行,只是因为它很复杂。但我想我涵盖了所有基础。
首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前有多少可观察对象正在执行以及缓冲区队列。可以影响状态的两个事件是一个新的 Observable 进入缓冲队列(导致缓冲队列入队),或者当前正在执行的 observable 终止(导致缓冲队列出队)。
由于状态机基本上意味着 Scan
,而 Scan
只能用于一种类型,我们必须将我们的两个事件强制转换为一种类型,我称之为 Message
以下。然后状态机知道所有并可以完成 Merge(n)
重载的工作。
最后一个技巧是回环:由于完成的 Observable 是 Scan
中的 'downstream',我们需要 'loop-back' 将那个 Observable 终止到 Scan
.为此,我总是参考 [this answer][1].
中的 Drain
函数
public static class X
{
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return Observable.Defer(() =>
{
var capacityQueue = new Subject<Unit>();
var toReturn = source.Publish(_source => _source
.Select(o => Message.Enqueue(o))
.Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
.Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
{
var buffer = state.buffer;
var bufferCount = state.bufferCount;
var executionCount = state.executionCount;
if (message.IsEnqueue)
{
if (executionCount < maximumConcurrency)
return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);
buffer = buffer.Enqueue(message.Object);
if (bufferCount == boundedCapacity)
buffer = buffer.Dequeue();
else
bufferCount++;
return (bufferCount, buffer, executionCount, null);
}
else
{
if (bufferCount == 0)
return (0, buffer, executionCount - 1, null);
else
return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
}
})
.Where(t => t.item != null)
.Select(t => t.item)
.Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
.TakeUntil(_source.IgnoreElements().Materialize())
.Merge()
);
return toReturn;
});
}
public class Message
{
public static Message<T> Enqueue<T>(T t)
{
return Message<T>.Enqueue(t);
}
public static Message<T> Dequeue<T>(T t)
{
return Message<T>.Dequeue(t);
}
}
public class Message<T>
{
private readonly T _t;
private readonly bool _isEnqueue;
private Message(bool isEnqueue, T t)
{
_t = t;
_isEnqueue = isEnqueue;
}
public static Message<T> Enqueue(T t)
{
return new Message<T>(true, t);
}
public static Message<T> Dequeue(T t)
{
return new Message<T>(false, t);
}
public bool IsEnqueue => _isEnqueue;
public T Object => _t;
}
}
我写了一些测试代码(基于原始问题)来验证,如果你想借用它。测试正在通过:
// T: 0123456789012345678901234567890123
// T10: 0 1 2 3
// Source: +----A------B------C------|
// A: +-------a----a---|
// B: +----------b----b---|
// C: +--------c----|
// ExpectedResult: +------------a----a---------c----|
var ts = new TestScheduler();
var A = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual"); // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
(测试代码无异常通过)
这是另一种实现方式。它不像 Shlomo 的 那样功能完备,因为它不能用 boundedCapacity: 0
配置。内部缓冲区的大小必须至少为 1。
/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, limiting the number of concurrent subscriptions to inner sequences.
/// The unsubscribed inner sequences are stored in a buffer with the specified
/// maximum capacity. When the buffer is full, the oldest inner sequence in the
/// buffer is dropped and ignored in order to make room for the latest inner
/// sequence.
/// </summary>
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency, int boundedCapacity)
{
if (boundedCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(boundedCapacity));
return Observable.Defer(() =>
{
var queue = new Queue<IObservable<T>>(boundedCapacity);
return source
.Select(inner =>
{
bool oldestDropped = false;
lock (queue)
{
if (queue.Count == boundedCapacity)
{
queue.Dequeue(); oldestDropped = true;
}
queue.Enqueue(inner);
}
if (oldestDropped) return null;
return Observable.Defer(() =>
{
lock (queue) return queue.Dequeue();
});
})
.Where(inner => inner != null)
.Merge(maximumConcurrency);
});
}
此实现基于以下假设:内置 Merge
运算符从不订阅同一子序列两次。否则语句 queue.Dequeue()
可能会在空的 queue
上调用,并导致异常。
我注意到 Rx Merge
运算符接受一个可选的 maxConcurrent
参数。这可用于通过同时订阅有限数量的子序列来限制最大并发性。当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但当新子序列的推送速度比订阅子序列的完成速度快时,它就会出现问题。发生的情况是子序列被缓冲在一个内部缓冲区中,其大小不断增加,而且当前订阅的子序列变得越来越旧。下面是这个问题的演示:
await Observable
.Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
.Select(_ => Observable
.Return(DateTime.Now)
.Do(d => Console.WriteLine(
$"Then: {d:HH:mm:ss.fff}, " +
$"Now: {DateTime.Now:HH:mm:ss.fff}, " +
$"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
.Delay(TimeSpan.FromMilliseconds(1000)))
.Merge(maxConcurrent: 1)
.Take(10);
每10毫秒推送一个新的子序列,每个子序列在1000毫秒后完成。子序列以最大并发1合并(顺序)。
输出:
Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes
内存使用量稳步增长,每个子序列的创建和订阅之间的时间间隔也在增长。
我想要的是一个自定义 Merge
变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入的子序列都应该导致当前最旧的缓冲子序列被丢弃。这是理想行为的大理石图,配置为最大并发 = 1 和缓冲区容量 = 1:
Source: +----A------B------C------|
A: +-------a----a---|
B: not-subscribed
C: +-----c----|
Result: +------------a----a---------c----|
- 子序列 A 在发出后立即被订阅。
- 然后 B 被发出并存储在缓冲区中,因为 A 还没有完成。
- 然后C被发出并替换了缓冲区中的B。结果,B 子序列被丢弃并且从未被订阅。
- 子序列 A 完成后立即订阅缓冲的子序列 C。
- 最终结果包含 A 和 C 子序列发出的合并值。
如何实现具有这种特定行为的自定义 Rx 运算符?这是我要实现的运算符的存根:
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return source.Merge(maximumConcurrency);
// TODO: enforce the boundedCapacity policy somehow
}
我想出了一个实用的解决方案,我不确定它是否可行,只是因为它很复杂。但我想我涵盖了所有基础。
首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前有多少可观察对象正在执行以及缓冲区队列。可以影响状态的两个事件是一个新的 Observable 进入缓冲队列(导致缓冲队列入队),或者当前正在执行的 observable 终止(导致缓冲队列出队)。
由于状态机基本上意味着 Scan
,而 Scan
只能用于一种类型,我们必须将我们的两个事件强制转换为一种类型,我称之为 Message
以下。然后状态机知道所有并可以完成 Merge(n)
重载的工作。
最后一个技巧是回环:由于完成的 Observable 是 Scan
中的 'downstream',我们需要 'loop-back' 将那个 Observable 终止到 Scan
.为此,我总是参考 [this answer][1].
Drain
函数
public static class X
{
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return Observable.Defer(() =>
{
var capacityQueue = new Subject<Unit>();
var toReturn = source.Publish(_source => _source
.Select(o => Message.Enqueue(o))
.Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
.Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
{
var buffer = state.buffer;
var bufferCount = state.bufferCount;
var executionCount = state.executionCount;
if (message.IsEnqueue)
{
if (executionCount < maximumConcurrency)
return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);
buffer = buffer.Enqueue(message.Object);
if (bufferCount == boundedCapacity)
buffer = buffer.Dequeue();
else
bufferCount++;
return (bufferCount, buffer, executionCount, null);
}
else
{
if (bufferCount == 0)
return (0, buffer, executionCount - 1, null);
else
return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
}
})
.Where(t => t.item != null)
.Select(t => t.item)
.Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
.TakeUntil(_source.IgnoreElements().Materialize())
.Merge()
);
return toReturn;
});
}
public class Message
{
public static Message<T> Enqueue<T>(T t)
{
return Message<T>.Enqueue(t);
}
public static Message<T> Dequeue<T>(T t)
{
return Message<T>.Dequeue(t);
}
}
public class Message<T>
{
private readonly T _t;
private readonly bool _isEnqueue;
private Message(bool isEnqueue, T t)
{
_t = t;
_isEnqueue = isEnqueue;
}
public static Message<T> Enqueue(T t)
{
return new Message<T>(true, t);
}
public static Message<T> Dequeue(T t)
{
return new Message<T>(false, t);
}
public bool IsEnqueue => _isEnqueue;
public T Object => _t;
}
}
我写了一些测试代码(基于原始问题)来验证,如果你想借用它。测试正在通过:
// T: 0123456789012345678901234567890123
// T10: 0 1 2 3
// Source: +----A------B------C------|
// A: +-------a----a---|
// B: +----------b----b---|
// C: +--------c----|
// ExpectedResult: +------------a----a---------c----|
var ts = new TestScheduler();
var A = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual"); // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
(测试代码无异常通过)
这是另一种实现方式。它不像 Shlomo 的 boundedCapacity: 0
配置。内部缓冲区的大小必须至少为 1。
/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, limiting the number of concurrent subscriptions to inner sequences.
/// The unsubscribed inner sequences are stored in a buffer with the specified
/// maximum capacity. When the buffer is full, the oldest inner sequence in the
/// buffer is dropped and ignored in order to make room for the latest inner
/// sequence.
/// </summary>
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency, int boundedCapacity)
{
if (boundedCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(boundedCapacity));
return Observable.Defer(() =>
{
var queue = new Queue<IObservable<T>>(boundedCapacity);
return source
.Select(inner =>
{
bool oldestDropped = false;
lock (queue)
{
if (queue.Count == boundedCapacity)
{
queue.Dequeue(); oldestDropped = true;
}
queue.Enqueue(inner);
}
if (oldestDropped) return null;
return Observable.Defer(() =>
{
lock (queue) return queue.Dequeue();
});
})
.Where(inner => inner != null)
.Merge(maximumConcurrency);
});
}
此实现基于以下假设:内置 Merge
运算符从不订阅同一子序列两次。否则语句 queue.Dequeue()
可能会在空的 queue
上调用,并导致异常。