RX:如何等待订阅者完成?

RX: How to wait for subscribers to finish?

我有一个从 Rest API 页面下载数据的生产者和几个处理页面(例如将它们加载到数据库)的消费者。

我想让生产者和消费者并行工作,这意味着生产者不应该在下载下一个页面之前等待一个页面被消费。每个消费者需要按顺序处理页面。

当所有页面都下载完毕后,主线程应该等待所有消费者完成他们的工作(因为消费可能比生产需要更长的时间)。

我目前的做法是:

我已经创建了一个下载页面的可观察对象,它会在附加消费者订阅者后立即启动。我将订阅者配置为观察 他们自己的并行执行线程。

C# 代码:

IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume1(page));

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume2(page));

observable.Connect();

此实现的问题是主线程可能会在所有页面都处理完并且应用程序停止之前完成。

如何使用 RX 实现此目的?

谢谢!

编辑:

还尝试了以下方法(来自答案):

static void Main(string[] args)
{
    var getPages = Enumerable.Range(0, 10);

    var els1 = new EventLoopScheduler();
    var els2 = new EventLoopScheduler();

    var observable =
        getPages
            .ToObservable()
            .Publish(ps =>
                Observable
                    .Merge(
                        ps.Select(p => Observable.Start(() => consume1(p), els1)),
                        ps.Select(p => Observable.Start(() => consume2(p), els2))));

    observable.Wait();
}

public static void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public static void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

observable.Wait() returns 一旦基础可枚举完成产生值。输出为:

1:0
2:0

只是为了证明,如果我们将 getPages 替换为:

var getPages = Enumerable.Range(0, 10)
    .Select(i =>
    {
        Console.WriteLine($"Produced {i}");
        Thread.Sleep(30);
        return i;
    });

那么输出是:

Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9

我认为这符合您的要求:

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

我写了这个测试代码:

var getPages = Enumerable.Range(0, 10);

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

observable.Wait();

public void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

我得到了这个输出:

1:0
2:0
2:1
1:1
2:2
2:3
1:2
2:4
2:5
1:3
2:6
2:7
1:4
2:8
2:9
1:5
1:6
1:7
1:8
1:9

完成 EventLoopScheduler 个实例后,您应该对它们调用 .Dispose() 以关闭线程。