停止所有后续调用以订阅响应式扩展
Stopping all subsequent calls to subscribe in reactive extensions
我正在用响应式扩展替换我以前编写的一些代码。
之前我使用 File.ReadAllLines(FileName) 然后遍历文件的所有行,在这个循环中的某个时刻,我跳出循环并对处理过的记录执行一些操作(现在我想在 Subscribe 方法的 OnCompleted 动作中做。
现在我正在尝试使用这个
var observableSequence = File.ReadAllLines(FileName).ToObservable();
observableSequence.Subscribe(u=>
{
//doing some thing but at some point I need to stop receving further calls and go for the OnCompleted action.
});
我知道有一个 If 条件的廉价 hack,在该条件之后不会满足,但我仍然不想浪费我的 CPU 周期来处理不必要的事件。
我知道停止这样做的唯一方法是处理订阅,但此时我正在订阅之间,这是不可能的。
那么有什么方法可以让我在某个时刻忽略所有事件并直接跳转到 OnCompleted。
由于您需要某种手动方式来停止 observable,您可以试试这个:
var signal = new Subject<Unit>();
var observableSequence =
File
.ReadAllLines(FileName)
.ToObservable(Scheduler.Default)
.TakeUntil(signal);
var count = 0;
observableSequence.Subscribe(u=>
{
if (++count == 100)
{
signal.OnNext(Unit.Default);
}
}, () => Console.WriteLine(count));
Console.WriteLine(count);
不过,最好将其作为查询的一部分进行。像这样:
var observableSequence =
File
.ReadAllLines(FileName)
.ToObservable(Scheduler.Default)
.TakeWhile((x, n) => n < 10);
var count = 0;
observableSequence.Subscribe(u => ++count, () => Console.WriteLine(count));
Console.WriteLine(count);
我正在用响应式扩展替换我以前编写的一些代码。
之前我使用 File.ReadAllLines(FileName) 然后遍历文件的所有行,在这个循环中的某个时刻,我跳出循环并对处理过的记录执行一些操作(现在我想在 Subscribe 方法的 OnCompleted 动作中做。
现在我正在尝试使用这个
var observableSequence = File.ReadAllLines(FileName).ToObservable();
observableSequence.Subscribe(u=>
{
//doing some thing but at some point I need to stop receving further calls and go for the OnCompleted action.
});
我知道有一个 If 条件的廉价 hack,在该条件之后不会满足,但我仍然不想浪费我的 CPU 周期来处理不必要的事件。
我知道停止这样做的唯一方法是处理订阅,但此时我正在订阅之间,这是不可能的。
那么有什么方法可以让我在某个时刻忽略所有事件并直接跳转到 OnCompleted。
由于您需要某种手动方式来停止 observable,您可以试试这个:
var signal = new Subject<Unit>();
var observableSequence =
File
.ReadAllLines(FileName)
.ToObservable(Scheduler.Default)
.TakeUntil(signal);
var count = 0;
observableSequence.Subscribe(u=>
{
if (++count == 100)
{
signal.OnNext(Unit.Default);
}
}, () => Console.WriteLine(count));
Console.WriteLine(count);
不过,最好将其作为查询的一部分进行。像这样:
var observableSequence =
File
.ReadAllLines(FileName)
.ToObservable(Scheduler.Default)
.TakeWhile((x, n) => n < 10);
var count = 0;
observableSequence.Subscribe(u => ++count, () => Console.WriteLine(count));
Console.WriteLine(count);