如何将阻塞事件转换为 Observable?
How to convert blocking events to Observable?
我正在学习 .net Rx(响应式扩展)库并尝试创建一个合适的 Observable 以从控制台读取用户输入。
到目前为止我是这样的:
public static IObservable<string> ConsoleInputObservable()
{
return Observable.Create<string>(observer =>
{
var cancelable = new BooleanDisposable();
while(!cancelable.IsDisposed)
{
observer.OnNext(Console.ReadLine());
}
observer.OnCompleted();
return cancelable;
});
}
不幸的是,此实现至少有一个问题 - 无法取消订阅。
所以我的问题是:如何正确地将一系列阻塞事件转换为 Observable?
谢谢。
编辑:拼写错误
给你。
注意几点:
- 这允许用户提供适当的调度程序来控制并发的位置 运行,并让出每个循环以防止它变得太混乱(尽管在控制台上等待显然很糟糕... )
- 我们不应在此示例中调用
OnCompleted
,因为终止的唯一方法是取消订阅 - 并且您努力不再发送消息 post 取消
- 我们也不在此处发送
OnNext
post 取消。
代码如下:
public static IObservable<string> ConsoleInputObservable(
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
while(!ct.IsCancellationRequested)
{
var next = Console.ReadLine();
if(ct.IsCancellationRequested)
return;
o.OnNext(next);
await ctrl.Yield();
}
});
});
}
附录
@MartinLiversage 评论说,多个订阅者的行为是不可取的——这促使了这个附录。您可以简单地 Publish()
上面的代码,但是鉴于控制台的性质是一个应用程序只有一个,并且一次只能有一个线程读取它,因此需要一种不同的方法。
我在上面忽略了这一点,因为我觉得这个问题可能更多地是关于线程方面而不是控制台的性质。如果您真的对在控制台输入的报告行感兴趣,像下面这样的某种主循环可能会更实用——这代表 Subject
.
的合理使用
static void Main()
{
Subject<string> sc = new Subject<string>();
// kick off subscriptions here...
// Perhaps with `ObserveOn` if background processing is required
sc.Subscribe(x => Console.WriteLine("Subscriber1: " + x));
sc.Subscribe(x => Console.WriteLine("Subscriber2: " + x));
string input;
while((input = Console.ReadLine()) != "q")
{
sc.OnNext(input);
}
sc.OnCompleted();
Console.WriteLine("Finished");
}
我正在学习 .net Rx(响应式扩展)库并尝试创建一个合适的 Observable 以从控制台读取用户输入。
到目前为止我是这样的:
public static IObservable<string> ConsoleInputObservable()
{
return Observable.Create<string>(observer =>
{
var cancelable = new BooleanDisposable();
while(!cancelable.IsDisposed)
{
observer.OnNext(Console.ReadLine());
}
observer.OnCompleted();
return cancelable;
});
}
不幸的是,此实现至少有一个问题 - 无法取消订阅。
所以我的问题是:如何正确地将一系列阻塞事件转换为 Observable?
谢谢。
编辑:拼写错误
给你。
注意几点:
- 这允许用户提供适当的调度程序来控制并发的位置 运行,并让出每个循环以防止它变得太混乱(尽管在控制台上等待显然很糟糕... )
- 我们不应在此示例中调用
OnCompleted
,因为终止的唯一方法是取消订阅 - 并且您努力不再发送消息 post 取消 - 我们也不在此处发送
OnNext
post 取消。
代码如下:
public static IObservable<string> ConsoleInputObservable(
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
while(!ct.IsCancellationRequested)
{
var next = Console.ReadLine();
if(ct.IsCancellationRequested)
return;
o.OnNext(next);
await ctrl.Yield();
}
});
});
}
附录
@MartinLiversage 评论说,多个订阅者的行为是不可取的——这促使了这个附录。您可以简单地 Publish()
上面的代码,但是鉴于控制台的性质是一个应用程序只有一个,并且一次只能有一个线程读取它,因此需要一种不同的方法。
我在上面忽略了这一点,因为我觉得这个问题可能更多地是关于线程方面而不是控制台的性质。如果您真的对在控制台输入的报告行感兴趣,像下面这样的某种主循环可能会更实用——这代表 Subject
.
static void Main()
{
Subject<string> sc = new Subject<string>();
// kick off subscriptions here...
// Perhaps with `ObserveOn` if background processing is required
sc.Subscribe(x => Console.WriteLine("Subscriber1: " + x));
sc.Subscribe(x => Console.WriteLine("Subscriber2: " + x));
string input;
while((input = Console.ReadLine()) != "q")
{
sc.OnNext(input);
}
sc.OnCompleted();
Console.WriteLine("Finished");
}