如何将非一次性对象与冷可观察对象的每个订阅绑定?
How to bind a non-disposable object with each subscription of a cold observable?
抱歉,如果之前有人问过这个问题,但我找不到重复的问题。也不好意思最近问了太多问题!我可能正在寻找一种不限于一次性资源的自定义 Observable.Using
方法。我拥有的是一个冷 IObservable
维护一些内部状态,例如 Random
实例。此实例不应与 IObservable
本身绑定,而应与其每个订阅绑定。每个订阅者应该使用此资源的不同实例。以下面的 GetRandomNumbers
方法为例:
static IObservable<int> GetRandomNumbers()
{
var random = new Random(0);
return Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10);
}
此方法生成10个随机数。 RNG 是一个用常量种子初始化的 Random
实例,因此它应该始终产生相同的 10 个数字。但可惜它没有:
var stream = GetRandomNumbers();
Console.WriteLine($"Results A: {String.Join(", ", await stream.ToArray())}");
Console.WriteLine($"Results B: {String.Join(", ", await stream.ToArray())}");
输出:
Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 3, 5, 6, 5, 9, 1, 8, 9, 7, 3
stream
observable 的每个订阅者都会得到一组不同的数字!发生的情况是所有订阅者都使用同一个 Random
实例。这不仅是不可取的,而且还会产生破坏对象内部状态的风险,因为 Random
class is not thread-safe.
我试图解决这个问题是使用 Using
运算符,它有一个 Func<TResource> resourceFactory
参数:
static IObservable<int> GetRandomNumbers()
{
return Observable.Using(() => new Random(0), random =>
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10)
);
}
如果 Random
是一次性的(我用一次性 class 测试了它并按预期工作),这将是一个完美的解决方案,但它不是,所以代码无法编译:
The type 'System.Random' cannot be used as type parameter 'TResource' in the generic type or method 'Observable.Using<TResult, TResource>(Func, Func<TResource, IObservable>)'. There is no implicit reference conversion from 'System.Random' to 'System.IDisposable'.
你能提出解决这个问题的方法吗?
Observable.Defer
是你的朋友,如果你想要每个订阅者的状态。
试试这个:
static IObservable<int> GetRandomNumbers() =>
Observable
.Defer(() =>
{
var random = new Random(0);
return Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10);
});
我的结果:
Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
抱歉,如果之前有人问过这个问题,但我找不到重复的问题。也不好意思最近问了太多问题!我可能正在寻找一种不限于一次性资源的自定义 Observable.Using
方法。我拥有的是一个冷 IObservable
维护一些内部状态,例如 Random
实例。此实例不应与 IObservable
本身绑定,而应与其每个订阅绑定。每个订阅者应该使用此资源的不同实例。以下面的 GetRandomNumbers
方法为例:
static IObservable<int> GetRandomNumbers()
{
var random = new Random(0);
return Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10);
}
此方法生成10个随机数。 RNG 是一个用常量种子初始化的 Random
实例,因此它应该始终产生相同的 10 个数字。但可惜它没有:
var stream = GetRandomNumbers();
Console.WriteLine($"Results A: {String.Join(", ", await stream.ToArray())}");
Console.WriteLine($"Results B: {String.Join(", ", await stream.ToArray())}");
输出:
Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 3, 5, 6, 5, 9, 1, 8, 9, 7, 3
stream
observable 的每个订阅者都会得到一组不同的数字!发生的情况是所有订阅者都使用同一个 Random
实例。这不仅是不可取的,而且还会产生破坏对象内部状态的风险,因为 Random
class is not thread-safe.
我试图解决这个问题是使用 Using
运算符,它有一个 Func<TResource> resourceFactory
参数:
static IObservable<int> GetRandomNumbers()
{
return Observable.Using(() => new Random(0), random =>
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10)
);
}
如果 Random
是一次性的(我用一次性 class 测试了它并按预期工作),这将是一个完美的解决方案,但它不是,所以代码无法编译:
The type 'System.Random' cannot be used as type parameter 'TResource' in the generic type or method 'Observable.Using<TResult, TResource>(Func, Func<TResource, IObservable>)'. There is no implicit reference conversion from 'System.Random' to 'System.IDisposable'.
你能提出解决这个问题的方法吗?
Observable.Defer
是你的朋友,如果你想要每个订阅者的状态。
试试这个:
static IObservable<int> GetRandomNumbers() =>
Observable
.Defer(() =>
{
var random = new Random(0);
return Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10);
});
我的结果:
Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3