按事件完成 IObservable
Complete IObservable by event
有一种方法可以使用 Observable.FromEvent
将事件包装为可观察的。例如。这个 class:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
但是,我还没有找到一种方法也可以通过事件来完成可观察性 - 我该怎么做?
更新:
为了阐明我的意思,上面的 class 生成 IObservable<T>
,即 "endless",并且永远不会完成。我想让它由另一个事件完成,而不是让另一个可观察到。所以问题可以简化为:
如何使任意IObservable<T>
提前完成,即调用OnCompleted
通知?
一个 observable 表示通知或事件流。当一个可观察的来源来自一个事件时,它们本质上是无穷无尽的。 observable 连接到事件,引用对象,因此支持事件的对象永远不会超出范围。 .NET/C# 没有提供一种方法来表明一个事件将永远不会被再次调用,所以直接连接到事件的可观察对象是无穷无尽的。
这并不少见;大多数基于事件的可观察对象从未 OnCompleted
显式调用,对现实世界进行建模,很难明确地说某事永远不会再次发生。
然而,这不是问题:Observables 意味着 运行 无限,并且不会造成任何损害。未订阅的可观察对象不会占用太多资源。如果您对事件源可观察对象不感兴趣,取消订阅所有订阅就可以了。
一种方法是使用 Take
运算符之一,例如 TakeUntil
运算符(如下所述)。尝试以下代码(使用您的 Generator
class):
var g = new Generator<int>();
g.Items
.TakeUntil(i => i > 3)
.Subscribe(
i => Console.WriteLine($"OnNext: {i}"),
e => Console.WriteLine($"OnError: Message: {e.Message}"),
() => Console.WriteLine("OnCompleted")
);
g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);
输出:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
TakeUntil
在出现整数大于 3 的消息后取消订阅 Items
observable。这就是为什么有 OnCompleted 而没有 5、6 消息的原因。
另外,正如Enigmativity所说,你的Generator<T>
class与Subject<T>
基本相同,我建议你使用那个
原回答:
从事件中创建另一个可观察对象,然后使用 .TakeUntil
:
class Generator<T>
{
event Action<T> onPush;
event Action<Unit> onCompleted;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
.TakeUntil(Completion);
public IObservable<Unit> Completion =>
Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);
public void Push(T item) => onPush?.Invoke(item);
public void Complete() => onCompleted?.Invoke(Unit.Default);
}
有一种方法可以使用 Observable.FromEvent
将事件包装为可观察的。例如。这个 class:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
但是,我还没有找到一种方法也可以通过事件来完成可观察性 - 我该怎么做?
更新:
为了阐明我的意思,上面的 class 生成 IObservable<T>
,即 "endless",并且永远不会完成。我想让它由另一个事件完成,而不是让另一个可观察到。所以问题可以简化为:
如何使任意IObservable<T>
提前完成,即调用OnCompleted
通知?
一个 observable 表示通知或事件流。当一个可观察的来源来自一个事件时,它们本质上是无穷无尽的。 observable 连接到事件,引用对象,因此支持事件的对象永远不会超出范围。 .NET/C# 没有提供一种方法来表明一个事件将永远不会被再次调用,所以直接连接到事件的可观察对象是无穷无尽的。
这并不少见;大多数基于事件的可观察对象从未 OnCompleted
显式调用,对现实世界进行建模,很难明确地说某事永远不会再次发生。
然而,这不是问题:Observables 意味着 运行 无限,并且不会造成任何损害。未订阅的可观察对象不会占用太多资源。如果您对事件源可观察对象不感兴趣,取消订阅所有订阅就可以了。
一种方法是使用 Take
运算符之一,例如 TakeUntil
运算符(如下所述)。尝试以下代码(使用您的 Generator
class):
var g = new Generator<int>();
g.Items
.TakeUntil(i => i > 3)
.Subscribe(
i => Console.WriteLine($"OnNext: {i}"),
e => Console.WriteLine($"OnError: Message: {e.Message}"),
() => Console.WriteLine("OnCompleted")
);
g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);
输出:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
TakeUntil
在出现整数大于 3 的消息后取消订阅 Items
observable。这就是为什么有 OnCompleted 而没有 5、6 消息的原因。
另外,正如Enigmativity所说,你的Generator<T>
class与Subject<T>
基本相同,我建议你使用那个
原回答:
从事件中创建另一个可观察对象,然后使用 .TakeUntil
:
class Generator<T>
{
event Action<T> onPush;
event Action<Unit> onCompleted;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
.TakeUntil(Completion);
public IObservable<Unit> Completion =>
Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);
public void Push(T item) => onPush?.Invoke(item);
public void Complete() => onCompleted?.Invoke(Unit.Default);
}