可观察集合 OnNext 未触发
Observable collection OnNext not firing
我有这个非常简单的可观察集合,OnNext 没有触发。
List<int> intList = new List<int>(){1,2,3};
IObservable<int> observableList = intList.ToObservable();
IDisposable subscription = observableList.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine( "OnError: " + ex.Message ),
( ) => Console.WriteLine( "OnCompleted" )
);
intList.Add(4);
我得到的输出如下。
从来源收到 1 个。
从来源收到 2 个。
从来源收到 3 个。
已完成
我将 4 添加到列表后,我期待 "Received 4 from source."。
有人可以指出我哪里做错了吗?我是新的 Rx
这完全取决于您的操作顺序。
如果您的代码结构如下:
List<int> intList = new List<int>() { 1, 2, 3 };
IObservable<int> observableList = intList.ToObservable();
intList.Add(4);
IDisposable subscription =
observableList
.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine("OnError: " + ex.Message),
() => Console.WriteLine("OnCompleted"));
...那么它就如你所愿。
问题是 .Subscribe
在 .ToObservable()
的当前线程上是 运行。实际代码 运行 是 return (IObservable<TSource>) new ToObservable<TSource>(source, SchedulerDefaults.Iteration);
。 SchedulerDefaults.Iteration
是当前线程。
你可以用这段代码看到:
List<int> intList = new List<int>() { 1, 2, 3 };
IObservable<int> observableList = intList.ToObservable();
Console.WriteLine("Before Subscription");
IDisposable subscription =
observableList
.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine("OnError: " + ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("After Subscription, Before Add");
intList.Add(4);
Console.WriteLine("After Add");
当我 运行 它时,我得到:
Before Subscription
Received 1 from source.
Received 2 from source.
Received 3 from source.
OnCompleted
After Subscription, Before Add
After Add
所以 .Add
直到订阅完成后才发生。
现在,如果我尝试通过将代码更改为 intList.ToObservable(Scheduler.Default)
来解决这个问题,那么我会遇到一个新问题。 运行 我上面的代码我得到这个:
Before Subscription
After Subscription, Before Add
After Add
Received 1 from source.
OnError: Collection was modified; enumeration operation may not execute.
很明显,我们遇到了并发问题。您不应该尝试同时操作集合和迭代它们。
这仅仅是因为 List 上的 .ToObservable() 只会在您每次订阅时为您提供列表中的当前项目,而不会持续通知添加的项目。对于实现 IEnumerable 的只读集合也可以这样说。
您可以改用其他集合,它们将按预期工作。
例如可观察集合
或者,您可以找到任何提供更改通知的集合类型(collection.Added += 等...)并使用 Observable.FromEvent 连接后续通知。
同样值得理解的是 IEnumerable.ToObservable 是一个冷可观察量,因此,为什么订阅顺序也很重要(根据第一个答案)。
我有这个非常简单的可观察集合,OnNext 没有触发。
List<int> intList = new List<int>(){1,2,3};
IObservable<int> observableList = intList.ToObservable();
IDisposable subscription = observableList.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine( "OnError: " + ex.Message ),
( ) => Console.WriteLine( "OnCompleted" )
);
intList.Add(4);
我得到的输出如下。
从来源收到 1 个。
从来源收到 2 个。
从来源收到 3 个。
已完成
我将 4 添加到列表后,我期待 "Received 4 from source."。
有人可以指出我哪里做错了吗?我是新的 Rx
这完全取决于您的操作顺序。
如果您的代码结构如下:
List<int> intList = new List<int>() { 1, 2, 3 };
IObservable<int> observableList = intList.ToObservable();
intList.Add(4);
IDisposable subscription =
observableList
.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine("OnError: " + ex.Message),
() => Console.WriteLine("OnCompleted"));
...那么它就如你所愿。
问题是 .Subscribe
在 .ToObservable()
的当前线程上是 运行。实际代码 运行 是 return (IObservable<TSource>) new ToObservable<TSource>(source, SchedulerDefaults.Iteration);
。 SchedulerDefaults.Iteration
是当前线程。
你可以用这段代码看到:
List<int> intList = new List<int>() { 1, 2, 3 };
IObservable<int> observableList = intList.ToObservable();
Console.WriteLine("Before Subscription");
IDisposable subscription =
observableList
.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine("OnError: " + ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("After Subscription, Before Add");
intList.Add(4);
Console.WriteLine("After Add");
当我 运行 它时,我得到:
Before Subscription
Received 1 from source.
Received 2 from source.
Received 3 from source.
OnCompleted
After Subscription, Before Add
After Add
所以 .Add
直到订阅完成后才发生。
现在,如果我尝试通过将代码更改为 intList.ToObservable(Scheduler.Default)
来解决这个问题,那么我会遇到一个新问题。 运行 我上面的代码我得到这个:
Before Subscription
After Subscription, Before Add
After Add
Received 1 from source.
OnError: Collection was modified; enumeration operation may not execute.
很明显,我们遇到了并发问题。您不应该尝试同时操作集合和迭代它们。
这仅仅是因为 List 上的 .ToObservable() 只会在您每次订阅时为您提供列表中的当前项目,而不会持续通知添加的项目。对于实现 IEnumerable 的只读集合也可以这样说。
您可以改用其他集合,它们将按预期工作。 例如可观察集合
或者,您可以找到任何提供更改通知的集合类型(collection.Added += 等...)并使用 Observable.FromEvent 连接后续通知。
同样值得理解的是 IEnumerable.ToObservable 是一个冷可观察量,因此,为什么订阅顺序也很重要(根据第一个答案)。