如何从控制台输入生成 IObservable<string>
How to make an IObservable<string> from console input
我试过按照下面的示例编写控制台可观察对象,但它不起作用。订阅存在一些问题。如何解决这些问题?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new TaskCompletionSource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
如果我使用 Observable.Interval
,它会订阅两次并且我有一个输入有两个输出。如果我使用任何版本的 FromConsole
,我有一个订阅和一个阻塞线程。
您需要 return 一个没有发布的可观察对象。然后您可以订阅它并进一步做您的事情。这是一个例子。当我 运行 时,我可以多次读取。
public class Program
{
static void Main(string[] args)
{
FromConsole().Subscribe(x =>
{
Console.WriteLine(x);
});
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
问题是 Console.ReadLine
是一个阻塞方法,所以对 FromConsole
序列的订阅会无限期地阻塞,所以永远不会到达 await Task.Delay(1500);
行。您可以通过异步读取控制台来解决此问题,将阻塞调用卸载到 ThreadPool
线程:
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(await Task.Run(() => Console.ReadLine()));
}
});
}
您可以查看 this 关于为什么没有比卸载更好的解决方案的问题。
附带说明,在不提供 onError
处理程序的情况下订阅序列 is not a good idea, unless having the process crash with an unhandled exception is an acceptable behavior for your app. It is especially problematic with sequences produced with Observable.Create<T>(async
, because it can lead to weird/buggy behavior like this one: 。
首先,通常最好避免使用 Observable.Create
来创建 observables - 它肯定是为了这个目的而存在的,但它可以创建不像你认为的那样表现的 observables 因为它们阻挡性质。正如您所发现的!
相反,如果可能,请使用内置运算符来创建可观察对象。在这种情况下可以做到这一点。
我的FromConsole
版本是这样的:
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
Observable.Start
实际上就像 Task.Run
对于可观察对象。它无阻塞地为我们调用Console.ReadLine()
。
Observable.Defer
/Repeat
对重复调用 Observable.Start(() => Console.ReadLine())
。没有 Defer
它只会调用 Observable.Start
并永远重复 return 一个字符串。
那就解决了。
现在,第二个问题是您想要查看 Console.ReadLine()
两个订阅对 FromConsole()
可观察值的输出值。
由于 Console.ReadLine
的工作方式,您将从每个订阅中获取值,但一次只能获取一个。试试这个代码:
static async Task Main(string[] args)
{
var observable = FromConsole();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
当我 运行 得到这种输出时:
1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf
这是因为每次订阅都会启动对 FromConsole
的新订阅。因此,您对 Console.ReadLine()
进行了两次调用,它们有效地排队,并且每个调用仅获取每个备用输入。因此 1
和 2
.
之间的交替
所以,要解决这个问题,您只需要 .Publish().RefCount()
运算符对。
试试这个:
static async Task Main(string[] args)
{
var observable = FromConsole().Publish().RefCount();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
我现在得到:
1:Hello
2:Hello
1:World
2:World
简而言之,非阻塞 FromConsole
observable 与 .Publish().RefCount()
的使用相结合,使这项工作按照您期望的方式进行。
我试过按照下面的示例编写控制台可观察对象,但它不起作用。订阅存在一些问题。如何解决这些问题?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new TaskCompletionSource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
如果我使用 Observable.Interval
,它会订阅两次并且我有一个输入有两个输出。如果我使用任何版本的 FromConsole
,我有一个订阅和一个阻塞线程。
您需要 return 一个没有发布的可观察对象。然后您可以订阅它并进一步做您的事情。这是一个例子。当我 运行 时,我可以多次读取。
public class Program
{
static void Main(string[] args)
{
FromConsole().Subscribe(x =>
{
Console.WriteLine(x);
});
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
问题是 Console.ReadLine
是一个阻塞方法,所以对 FromConsole
序列的订阅会无限期地阻塞,所以永远不会到达 await Task.Delay(1500);
行。您可以通过异步读取控制台来解决此问题,将阻塞调用卸载到 ThreadPool
线程:
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(await Task.Run(() => Console.ReadLine()));
}
});
}
您可以查看 this 关于为什么没有比卸载更好的解决方案的问题。
附带说明,在不提供 onError
处理程序的情况下订阅序列 is not a good idea, unless having the process crash with an unhandled exception is an acceptable behavior for your app. It is especially problematic with sequences produced with Observable.Create<T>(async
, because it can lead to weird/buggy behavior like this one:
首先,通常最好避免使用 Observable.Create
来创建 observables - 它肯定是为了这个目的而存在的,但它可以创建不像你认为的那样表现的 observables 因为它们阻挡性质。正如您所发现的!
相反,如果可能,请使用内置运算符来创建可观察对象。在这种情况下可以做到这一点。
我的FromConsole
版本是这样的:
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
Observable.Start
实际上就像 Task.Run
对于可观察对象。它无阻塞地为我们调用Console.ReadLine()
。
Observable.Defer
/Repeat
对重复调用 Observable.Start(() => Console.ReadLine())
。没有 Defer
它只会调用 Observable.Start
并永远重复 return 一个字符串。
那就解决了。
现在,第二个问题是您想要查看 Console.ReadLine()
两个订阅对 FromConsole()
可观察值的输出值。
由于 Console.ReadLine
的工作方式,您将从每个订阅中获取值,但一次只能获取一个。试试这个代码:
static async Task Main(string[] args)
{
var observable = FromConsole();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
当我 运行 得到这种输出时:
1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf
这是因为每次订阅都会启动对 FromConsole
的新订阅。因此,您对 Console.ReadLine()
进行了两次调用,它们有效地排队,并且每个调用仅获取每个备用输入。因此 1
和 2
.
所以,要解决这个问题,您只需要 .Publish().RefCount()
运算符对。
试试这个:
static async Task Main(string[] args)
{
var observable = FromConsole().Publish().RefCount();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
我现在得到:
1:Hello
2:Hello
1:World
2:World
简而言之,非阻塞 FromConsole
observable 与 .Publish().RefCount()
的使用相结合,使这项工作按照您期望的方式进行。