热可观察和 IDisposable
Hot observable and IDisposable
我想找到有关作为事件类型的热可观察对象和 IDisposable 对象的最佳实践。
假设我的代码将 Bitmap 对象生成为热可观察对象,并且我有多个订阅者。
例如:
public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return Directory.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.Select(x => new Bitmap(x))
.Publish()
.RefCount();
}
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
}
所以问题是:处理作为热可观察对象来源的一次性资源的最佳实践是什么?
在这个例子中,我想在使用后处理图像,但我不知道 - 什么时候
到底是?
订阅事件的调用顺序并不明显,因此我无法处理 'last'。
提前致谢。
您的 Observable 不热。这是一个具有共享源的冷可观察对象,它只会让后续的观察者表现得好像他们得到了一个热可观察对象。最好将其描述为温暖的可观察对象。
我们来看一个例子:
var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount();
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.Publish()
.RefCount()
.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });
当我 运行 我得到:
A
A
B
C
A
B
C
E
E
E
"B" & "C" 观察者错过了序列的第一个值。
并且,在 "A"、"B" 和 "C" 观察者完成后,序列完成,因此 "D" 永远不会获得值。我不得不创建一个全新的 Observable 来获取要显示的值 "E"。
因此,在您的代码中您遇到了一个问题,如果第一个观察者在第二个和第三个订阅之前完成了一个或多个值,那么这些观察者会丢失值。是你想要的吗?
尽管如此,您的问题询问的是如何处理来自可观察对象的一次性值 return。如果使用 Observable.Using
.
就很简单
这里有一个与您的代码类似的情况:
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)))
.Publish()
.RefCount();
}
现在如果我 运行 这个代码:
var query = ImagesInFolder(Scheduler.Default);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
我得到这个输出:
A
B
C
Disposed!
A
B
C
Disposed!
A
B
C
Disposed!
再次 "D" 永远不会产生任何值 - "B" 和 "C" 可能会丢失值,但这确实展示了如何 return 一个可观察的值自动处理 observer/s is/are 完成。
您的代码如下所示:
public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish()
.RefCount();
}
但是,您仍然处于可能缺失值的状态。
因此你需要真正做到这一点:
public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish();
}
然后你这样称呼它:
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
images.Connect();
}
另一种选择是删除整个 .Publish().RefCount()
代码,并确保您在订阅时自己正确执行。
试试这个代码:
void Main()
{
ImagesInFolder(Scheduler.Default)
.Publish(iif =>
Observable
.Merge(
iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
.Subscribe();
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
我明白了:
A
B
C
Disposed!
A
B
C
Disposed!
A
B
C
Disposed!
同样,每个观察者有运行后一个Disposed!
,但是现在的问题是我改变了每个观察者的处理延迟,但是代码仍然输出的是顺序增加了观察员。问题是 Rx 运行s 每个观察者按顺序排列,每个产生的值按顺序排列。
我希望您认为可以使用 .Publish()
进行并行处理。你没有。
并行地达到 运行 的方法是完全放弃 .Publish()
。
就做这种事情:
void Main()
{
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); });
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
我现在明白了:
A
Disposed!
C
Disposed!
A
Disposed!
B
Disposed!
A
Disposed!
C
Disposed!
C
Disposed!
B
Disposed!
B
Disposed!
代码现在 运行 并行并尽快完成 - 并在订阅完成时正确处理 IDisposable
。你只是没有得到与每个观察者共享单一一次性资源的乐趣,但你也没有得到所有的行为问题。
我想找到有关作为事件类型的热可观察对象和 IDisposable 对象的最佳实践。
假设我的代码将 Bitmap 对象生成为热可观察对象,并且我有多个订阅者。 例如:
public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return Directory.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.Select(x => new Bitmap(x))
.Publish()
.RefCount();
}
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
}
所以问题是:处理作为热可观察对象来源的一次性资源的最佳实践是什么?
在这个例子中,我想在使用后处理图像,但我不知道 - 什么时候 到底是?
订阅事件的调用顺序并不明显,因此我无法处理 'last'。
提前致谢。
您的 Observable 不热。这是一个具有共享源的冷可观察对象,它只会让后续的观察者表现得好像他们得到了一个热可观察对象。最好将其描述为温暖的可观察对象。
我们来看一个例子:
var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount();
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.Publish()
.RefCount()
.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });
当我 运行 我得到:
A A B C A B C E E E
"B" & "C" 观察者错过了序列的第一个值。
并且,在 "A"、"B" 和 "C" 观察者完成后,序列完成,因此 "D" 永远不会获得值。我不得不创建一个全新的 Observable 来获取要显示的值 "E"。
因此,在您的代码中您遇到了一个问题,如果第一个观察者在第二个和第三个订阅之前完成了一个或多个值,那么这些观察者会丢失值。是你想要的吗?
尽管如此,您的问题询问的是如何处理来自可观察对象的一次性值 return。如果使用 Observable.Using
.
这里有一个与您的代码类似的情况:
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)))
.Publish()
.RefCount();
}
现在如果我 运行 这个代码:
var query = ImagesInFolder(Scheduler.Default);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
我得到这个输出:
A B C Disposed! A B C Disposed! A B C Disposed!
再次 "D" 永远不会产生任何值 - "B" 和 "C" 可能会丢失值,但这确实展示了如何 return 一个可观察的值自动处理 observer/s is/are 完成。
您的代码如下所示:
public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish()
.RefCount();
}
但是,您仍然处于可能缺失值的状态。
因此你需要真正做到这一点:
public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish();
}
然后你这样称呼它:
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
images.Connect();
}
另一种选择是删除整个 .Publish().RefCount()
代码,并确保您在订阅时自己正确执行。
试试这个代码:
void Main()
{
ImagesInFolder(Scheduler.Default)
.Publish(iif =>
Observable
.Merge(
iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
.Subscribe();
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
我明白了:
A B C Disposed! A B C Disposed! A B C Disposed!
同样,每个观察者有运行后一个Disposed!
,但是现在的问题是我改变了每个观察者的处理延迟,但是代码仍然输出的是顺序增加了观察员。问题是 Rx 运行s 每个观察者按顺序排列,每个产生的值按顺序排列。
我希望您认为可以使用 .Publish()
进行并行处理。你没有。
并行地达到 运行 的方法是完全放弃 .Publish()
。
就做这种事情:
void Main()
{
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); });
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
我现在明白了:
A Disposed! C Disposed! A Disposed! B Disposed! A Disposed! C Disposed! C Disposed! B Disposed! B Disposed!
代码现在 运行 并行并尽快完成 - 并在订阅完成时正确处理 IDisposable
。你只是没有得到与每个观察者共享单一一次性资源的乐趣,但你也没有得到所有的行为问题。