如何从控制台输入生成 IObservable<string>

How to make an IObservable<string> from console input

我试过按照下面的示例编写控制台可观察对象,但它不起作用。订阅存在一些问题。如何解决这些问题?

static class Program
{
    static async Task Main(string[] args)
    {
        // var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
        // var observable = FromConsole().Publish().RefCount(); // doesn't work
        var observable = FromConsole(); // doesn't work
        observable.Subscribe(Console.WriteLine);
        await Task.Delay(1500);
        observable.Subscribe(Console.WriteLine);
        await new TaskCompletionSource().Task;
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

如果我使用 Observable.Interval,它会订阅两次并且我有一个输入有两个输出。如果我使用任何版本的 FromConsole,我有一个订阅和一个阻塞线程。

您需要 return 一个没有发布的可观察对象。然后您可以订阅它并进一步做您的事情。这是一个例子。当我 运行 时,我可以多次读取。

public class Program
{

    static void Main(string[] args)
    {
        FromConsole().Subscribe(x =>
        {
            Console.WriteLine(x);
        });
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

问题是 Console.ReadLine 是一个阻塞方法,所以对 FromConsole 序列的订阅会无限期地阻塞,所以永远不会到达 await Task.Delay(1500); 行。您可以通过异步读取控制台来解决此问题,将阻塞调用卸载到 ThreadPool 线程:

static IObservable<string> FromConsole()
{
    return Observable.Create<string>(async observer =>
    {
        while (true)
        {
            observer.OnNext(await Task.Run(() => Console.ReadLine()));
        }
    });
}

您可以查看 this 关于为什么没有比卸载更好的解决方案的问题。

附带说明,在不提供 onError 处理程序的情况下订阅序列 is not a good idea, unless having the process crash with an unhandled exception is an acceptable behavior for your app. It is especially problematic with sequences produced with Observable.Create<T>(async, because it can lead to weird/buggy behavior like this one:

首先,通常最好避免使用 Observable.Create 来创建 observables - 它肯定是为了这个目的而存在的,但它可以创建不像你认为的那样表现的 observables 因为它们阻挡性质。正如您所发现的!

相反,如果可能,请使用内置运算符来创建可观察对象。在这种情况下可以做到这一点。

我的FromConsole版本是这样的:

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();

Observable.Start 实际上就像 Task.Run 对于可观察对象。它无阻塞地为我们调用Console.ReadLine()

Observable.Defer/Repeat 对重复调用 Observable.Start(() => Console.ReadLine())。没有 Defer 它只会调用 Observable.Start 并永远重复 return 一个字符串。

那就解决了。

现在,第二个问题是您想要查看 Console.ReadLine() 两个订阅对 FromConsole() 可观察值的输出值。

由于 Console.ReadLine 的工作方式,您将从每个订阅中获取值,但一次只能获取一个。试试这个代码:

static async Task Main(string[] args)
{
    var observable = FromConsole();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

当我 运行 得到这种输出时:

1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf

这是因为每次订阅都会启动对 FromConsole 的新订阅。因此,您对 Console.ReadLine() 进行了两次调用,它们有效地排队,并且每个调用仅获取每个备用输入。因此 12.

之间的交替

所以,要解决这个问题,您只需要 .Publish().RefCount() 运算符对。

试试这个:

static async Task Main(string[] args)
{
    var observable = FromConsole().Publish().RefCount();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

我现在得到:

1:Hello
2:Hello
1:World
2:World

简而言之,非阻塞 FromConsole observable 与 .Publish().RefCount() 的使用相结合,使这项工作按照您期望的方式进行。