如何处理 IDisposableValue 的 IObservable
How to handle IObservable of IDisposableValue
入门;在我的整个代码库中,我经常需要使用池化内存块。这样做是出于性能原因,以减少垃圾收集(制作实时视频游戏引擎组件)。我通过将类型公开为 IDisposableValue 来处理此问题,您只能在处理包装器之前访问 T 值。您将包装器配置为 return 池中要重用的值。
我建立了数据处理流,使用这些包装值来响应随时间发生的事件。这通常是 Observables/Reactive 扩展的完美候选者,除了必须处理包装器本质上是一种可变性形式,这是你在反应时不想要的。如果一个订阅者在完成包装器后处理包装器,但第二个观察者仍在使用它,则包装器将抛出异常。
预期目标: 让每个订阅者在原始真实包装值上接收一个单独的包装器。然后,只有在每个订阅者处理他们的个人包装器(想想 RefCountDisposable)后,才会处理基础值。因此,每个订阅者都可以根据需要使用该值,并且他们通过处置表示他们已完成。当所有这些都完成后,该值将被释放回池中。
唯一的问题是我不知道如何在 RX 中正确实现它。这是处理我的情况的合适方法吗?如果是的话,有关于如何实际实施它的任何指示吗?
编辑 1 - 使用 ISubject 的肮脏解决方案:
我尝试使用 Observable 的各种组合使其工作。Select/Create/Defer 但无法实现上述预期目标。相反,我不得不转向使用主题,我知道这是回避的。这是我当前的代码。
public class SharedDisposableValueSubject<T> : AbstractDisposable, ISubject<IDisposableValue<T>>
{
private readonly Subject<SharedDisposable> subject;
private readonly SubscriptionCounter<SharedDisposable> counter;
private readonly IObservable<IDisposableValue<T>> observable;
public SharedDisposableValueSubject()
{
this.subject = new Subject<SharedDisposable>();
this.counter = new SubscriptionCounter<SharedDisposable>(this.subject);
this.observable = this.counter.Source.Select(value => value.GetValue());
}
/// <inheritdoc />
public void OnCompleted() => this.subject.OnCompleted();
/// <inheritdoc />
public void OnError(Exception error) => this.subject.OnError(error);
/// <inheritdoc />
public void OnNext(IDisposableValue<T> value) =>
this.subject.OnNext(new SharedDisposable(value, this.counter.Count));
/// <inheritdoc />
public IDisposable Subscribe(IObserver<IDisposableValue<T>> observer) => this.observable.Subscribe(observer);
/// <inheritdoc />
protected override void ManagedDisposal() => this.subject.Dispose();
private class SharedDisposable
{
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value, int count)
{
Contracts.Requires.That(count >= 0);
this.value = value;
this.count = new AtomicInt(count);
if (count == 0)
{
this.value?.Dispose();
}
}
public IDisposableValue<T> GetValue() => new ValuePin(this);
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable parent;
public ValuePin(SharedDisposable parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
}
public class SubscriptionCounter<T>
{
private readonly AtomicInt count = new AtomicInt(0);
public SubscriptionCounter(IObservable<T> source)
{
Contracts.Requires.That(source != null);
this.Source = Observable.Create<T>(observer =>
{
this.count.Increment();
return new Subscription(source.Subscribe(observer), this.count);
});
}
public int Count => this.count.Read();
public IObservable<T> Source { get; }
private class Subscription : AbstractDisposable
{
private readonly IDisposable subscription;
private readonly AtomicInt count;
public Subscription(IDisposable subscription, AtomicInt count)
{
Contracts.Requires.That(subscription != null);
Contracts.Requires.That(count != null);
this.subscription = subscription;
this.count = count;
}
/// <inheritdoc />
protected override void ManagedDisposal()
{
this.subscription.Dispose();
this.count.Decrement();
}
}
}
public interface IDisposableValue<out T> : IDisposable
{
bool IsDisposed { get; }
T Value { get; }
}
AbstractDisposable 只是一个基础 class 一次性模式的实现,适用于不保留非托管类型的类型。它确保 ManagedDisposal() 仅在第一次调用 Dispose() 时被调用一次。 AtomicInt 是 Interlocked on int 的包装器,用于为 int 提供线程安全的原子更新。
我的测试代码展示了如何使用 SharedDisposableValueSubject;
public static class SharedDisposableValueSubjectTests
{
[Fact]
public static void NoSubcribersValueAutoDisposes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var sourceValue = new DisposableWrapper<int>(0);
sourceValue.IsDisposed.Should().BeFalse();
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSurcriber()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void ManySubcribers()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved1.Should().NotBeNull();
retrieved1.Value.Should().Be(testNumber);
retrieved1.IsDisposed.Should().BeFalse();
retrieved2.Should().NotBeNull();
retrieved2.Value.Should().Be(testNumber);
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing only 1 retrieved value does not yet dispose the source value
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
retrieved2.Value.Should().Be(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void DisposingManyTimesStillRequiresEachSubscriberToDispose()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
subject.OnNext(sourceValue);
// disposing only 1 retrieved value does not yet dispose the source value
// even though the retrieved value is disposed many times
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// source value auto disposes because no subscribers
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved causes source to be disposed
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static async Task DelayedSubcriberAsync()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// delay countdown event used just to ensure that the value isn't disposed until assertions checked
var delay = new AsyncCountdownEvent(1);
var disposed = new AsyncCountdownEvent(2);
subject.Delay(TimeSpan.FromSeconds(1)).Subscribe(async value =>
{
await delay.WaitAsync().DontMarshallContext();
value.Dispose();
disposed.Signal(1);
});
subject.Subscribe(value =>
{
value.Dispose();
disposed.Signal(1);
});
// value is not yet disposed
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeFalse();
// wait for value to be disposed
delay.Signal(1);
await disposed.WaitAsync().DontMarshallContext();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void MultipleObservedValues()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber1 = 1;
var sourceValue1 = new DisposableWrapper<int>(testNumber1);
sourceValue1.IsDisposed.Should().BeFalse();
var testNumber2 = 2;
var sourceValue2 = new DisposableWrapper<int>(testNumber2);
sourceValue2.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// first test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue1);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber1);
retrieved.IsDisposed.Should().BeFalse();
sourceValue1.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue1.IsDisposed.Should().BeTrue();
// second test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue2);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber2);
retrieved.IsDisposed.Should().BeFalse();
sourceValue2.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue2.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
}
所有这些都通过了,但我意识到您可以使用 observable 做很多事情,所以可能会有一些我没有考虑过的用例会破坏这个实现。如果您知道任何问题,请告诉我。也可能只是我试图让 Rx 做一些它本来不想做的事情。
编辑 2 - 使用发布的解决方案:
我使用 Publish 来包装 SharedDisposable 中原始可观察对象的一次性值,保证每个原始值只包装一次。然后发布的 observable 被订阅者计数,每个订阅者获得一个单独的 ValuePin,当处理时减少 SharedDisposable 上的计数。当 SharedDisposable 计数达到 0 时,它会释放原始值。
我尝试不进行订阅计数,而是使用它,因此每次发出 ValuePin 时都会增加计数,但我找不到一种方法来保证它会在允许之前为每个订阅者创建 ValuePins订户处置它们。这导致订阅者 1 获得他们的 pin,计数从 0 变为 1,然后在订阅者 2 获得他们的 pin 之前处理该 pin,计数从 1 变为 0 触发原始值被处理,然后订阅者 2 应该收到一个别针,但现在为时已晚。
public static IObservable<IDisposableValue<T>> ShareDisposable<T>(this IObservable<IDisposableValue<T>> source)
{
Contracts.Requires.That(source != null);
var published = source.Select(value => new SharedDisposable<T>(value)).Publish();
var counter = new SubscriptionCounter<SharedDisposable<T>>(published);
published.Connect();
return counter.CountedSource.Select(value => value.GetValue(counter.Count));
}
private class SharedDisposable<T>
{
private const int Uninitialized = -1;
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value)
{
this.value = value;
this.count = new AtomicInt(Uninitialized);
}
public IDisposableValue<T> GetValue(int subscriberCount)
{
Contracts.Requires.That(subscriberCount >= 0);
this.count.CompareExchange(subscriberCount, Uninitialized);
return new ValuePin(this);
}
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable<T> parent;
public ValuePin(SharedDisposable<T> parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
这当然看起来更好,因为我不必以任何方式使用主题,尽管订阅者计数感觉很脏。特别是因为我需要在第一个 ValuePin 发出之前不初始化计数。需要明确的是,我正在尝试处理由 0 到许多订阅者共享的可观察对象产生的值的处理,而不是处理与可观察对象本身的连接,这就是为什么我不使用 RefCount 而不是 Connect .
我想你可以重新计算一次性的。这将需要发布者启动引用计数,然后每个订阅者递增和递减计数器。您可以使用 RefCountDisposable 来执行此操作。我只会考虑对 private/internal 代码执行此操作,否则你可能会让漏水的消费者破坏你的系统。
Rx 的替代解决方案可能是查看 Disruptor 模式。
入门;在我的整个代码库中,我经常需要使用池化内存块。这样做是出于性能原因,以减少垃圾收集(制作实时视频游戏引擎组件)。我通过将类型公开为 IDisposableValue 来处理此问题,您只能在处理包装器之前访问 T 值。您将包装器配置为 return 池中要重用的值。
我建立了数据处理流,使用这些包装值来响应随时间发生的事件。这通常是 Observables/Reactive 扩展的完美候选者,除了必须处理包装器本质上是一种可变性形式,这是你在反应时不想要的。如果一个订阅者在完成包装器后处理包装器,但第二个观察者仍在使用它,则包装器将抛出异常。
预期目标: 让每个订阅者在原始真实包装值上接收一个单独的包装器。然后,只有在每个订阅者处理他们的个人包装器(想想 RefCountDisposable)后,才会处理基础值。因此,每个订阅者都可以根据需要使用该值,并且他们通过处置表示他们已完成。当所有这些都完成后,该值将被释放回池中。
唯一的问题是我不知道如何在 RX 中正确实现它。这是处理我的情况的合适方法吗?如果是的话,有关于如何实际实施它的任何指示吗?
编辑 1 - 使用 ISubject 的肮脏解决方案:
我尝试使用 Observable 的各种组合使其工作。Select/Create/Defer 但无法实现上述预期目标。相反,我不得不转向使用主题,我知道这是回避的。这是我当前的代码。
public class SharedDisposableValueSubject<T> : AbstractDisposable, ISubject<IDisposableValue<T>>
{
private readonly Subject<SharedDisposable> subject;
private readonly SubscriptionCounter<SharedDisposable> counter;
private readonly IObservable<IDisposableValue<T>> observable;
public SharedDisposableValueSubject()
{
this.subject = new Subject<SharedDisposable>();
this.counter = new SubscriptionCounter<SharedDisposable>(this.subject);
this.observable = this.counter.Source.Select(value => value.GetValue());
}
/// <inheritdoc />
public void OnCompleted() => this.subject.OnCompleted();
/// <inheritdoc />
public void OnError(Exception error) => this.subject.OnError(error);
/// <inheritdoc />
public void OnNext(IDisposableValue<T> value) =>
this.subject.OnNext(new SharedDisposable(value, this.counter.Count));
/// <inheritdoc />
public IDisposable Subscribe(IObserver<IDisposableValue<T>> observer) => this.observable.Subscribe(observer);
/// <inheritdoc />
protected override void ManagedDisposal() => this.subject.Dispose();
private class SharedDisposable
{
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value, int count)
{
Contracts.Requires.That(count >= 0);
this.value = value;
this.count = new AtomicInt(count);
if (count == 0)
{
this.value?.Dispose();
}
}
public IDisposableValue<T> GetValue() => new ValuePin(this);
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable parent;
public ValuePin(SharedDisposable parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
}
public class SubscriptionCounter<T>
{
private readonly AtomicInt count = new AtomicInt(0);
public SubscriptionCounter(IObservable<T> source)
{
Contracts.Requires.That(source != null);
this.Source = Observable.Create<T>(observer =>
{
this.count.Increment();
return new Subscription(source.Subscribe(observer), this.count);
});
}
public int Count => this.count.Read();
public IObservable<T> Source { get; }
private class Subscription : AbstractDisposable
{
private readonly IDisposable subscription;
private readonly AtomicInt count;
public Subscription(IDisposable subscription, AtomicInt count)
{
Contracts.Requires.That(subscription != null);
Contracts.Requires.That(count != null);
this.subscription = subscription;
this.count = count;
}
/// <inheritdoc />
protected override void ManagedDisposal()
{
this.subscription.Dispose();
this.count.Decrement();
}
}
}
public interface IDisposableValue<out T> : IDisposable
{
bool IsDisposed { get; }
T Value { get; }
}
AbstractDisposable 只是一个基础 class 一次性模式的实现,适用于不保留非托管类型的类型。它确保 ManagedDisposal() 仅在第一次调用 Dispose() 时被调用一次。 AtomicInt 是 Interlocked on int 的包装器,用于为 int 提供线程安全的原子更新。
我的测试代码展示了如何使用 SharedDisposableValueSubject;
public static class SharedDisposableValueSubjectTests
{
[Fact]
public static void NoSubcribersValueAutoDisposes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var sourceValue = new DisposableWrapper<int>(0);
sourceValue.IsDisposed.Should().BeFalse();
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSurcriber()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void ManySubcribers()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved1.Should().NotBeNull();
retrieved1.Value.Should().Be(testNumber);
retrieved1.IsDisposed.Should().BeFalse();
retrieved2.Should().NotBeNull();
retrieved2.Value.Should().Be(testNumber);
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing only 1 retrieved value does not yet dispose the source value
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
retrieved2.Value.Should().Be(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void DisposingManyTimesStillRequiresEachSubscriberToDispose()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
subject.OnNext(sourceValue);
// disposing only 1 retrieved value does not yet dispose the source value
// even though the retrieved value is disposed many times
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// source value auto disposes because no subscribers
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved causes source to be disposed
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static async Task DelayedSubcriberAsync()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// delay countdown event used just to ensure that the value isn't disposed until assertions checked
var delay = new AsyncCountdownEvent(1);
var disposed = new AsyncCountdownEvent(2);
subject.Delay(TimeSpan.FromSeconds(1)).Subscribe(async value =>
{
await delay.WaitAsync().DontMarshallContext();
value.Dispose();
disposed.Signal(1);
});
subject.Subscribe(value =>
{
value.Dispose();
disposed.Signal(1);
});
// value is not yet disposed
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeFalse();
// wait for value to be disposed
delay.Signal(1);
await disposed.WaitAsync().DontMarshallContext();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void MultipleObservedValues()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber1 = 1;
var sourceValue1 = new DisposableWrapper<int>(testNumber1);
sourceValue1.IsDisposed.Should().BeFalse();
var testNumber2 = 2;
var sourceValue2 = new DisposableWrapper<int>(testNumber2);
sourceValue2.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// first test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue1);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber1);
retrieved.IsDisposed.Should().BeFalse();
sourceValue1.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue1.IsDisposed.Should().BeTrue();
// second test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue2);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber2);
retrieved.IsDisposed.Should().BeFalse();
sourceValue2.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue2.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
}
所有这些都通过了,但我意识到您可以使用 observable 做很多事情,所以可能会有一些我没有考虑过的用例会破坏这个实现。如果您知道任何问题,请告诉我。也可能只是我试图让 Rx 做一些它本来不想做的事情。
编辑 2 - 使用发布的解决方案:
我使用 Publish 来包装 SharedDisposable 中原始可观察对象的一次性值,保证每个原始值只包装一次。然后发布的 observable 被订阅者计数,每个订阅者获得一个单独的 ValuePin,当处理时减少 SharedDisposable 上的计数。当 SharedDisposable 计数达到 0 时,它会释放原始值。
我尝试不进行订阅计数,而是使用它,因此每次发出 ValuePin 时都会增加计数,但我找不到一种方法来保证它会在允许之前为每个订阅者创建 ValuePins订户处置它们。这导致订阅者 1 获得他们的 pin,计数从 0 变为 1,然后在订阅者 2 获得他们的 pin 之前处理该 pin,计数从 1 变为 0 触发原始值被处理,然后订阅者 2 应该收到一个别针,但现在为时已晚。
public static IObservable<IDisposableValue<T>> ShareDisposable<T>(this IObservable<IDisposableValue<T>> source)
{
Contracts.Requires.That(source != null);
var published = source.Select(value => new SharedDisposable<T>(value)).Publish();
var counter = new SubscriptionCounter<SharedDisposable<T>>(published);
published.Connect();
return counter.CountedSource.Select(value => value.GetValue(counter.Count));
}
private class SharedDisposable<T>
{
private const int Uninitialized = -1;
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value)
{
this.value = value;
this.count = new AtomicInt(Uninitialized);
}
public IDisposableValue<T> GetValue(int subscriberCount)
{
Contracts.Requires.That(subscriberCount >= 0);
this.count.CompareExchange(subscriberCount, Uninitialized);
return new ValuePin(this);
}
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable<T> parent;
public ValuePin(SharedDisposable<T> parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
这当然看起来更好,因为我不必以任何方式使用主题,尽管订阅者计数感觉很脏。特别是因为我需要在第一个 ValuePin 发出之前不初始化计数。需要明确的是,我正在尝试处理由 0 到许多订阅者共享的可观察对象产生的值的处理,而不是处理与可观察对象本身的连接,这就是为什么我不使用 RefCount 而不是 Connect .
我想你可以重新计算一次性的。这将需要发布者启动引用计数,然后每个订阅者递增和递减计数器。您可以使用 RefCountDisposable 来执行此操作。我只会考虑对 private/internal 代码执行此操作,否则你可能会让漏水的消费者破坏你的系统。 Rx 的替代解决方案可能是查看 Disruptor 模式。