使 IObservable 订阅并发

Make an IObservable subscription concurrent

我有以下代码

string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
    .Subscribe(PointCloudMergerCompleted);

其中 SolverManagementService _solverManagementService

Public class SolverManagementService : ISolverManagementService
{
    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
        CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
                            pairCollection, token));
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }
    ... // Other methods. 
}

但是这里 _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token) 很昂贵,虽然这个 returns 一个 Task<IPointCloud> 我没有线程化这个和这个调用块。由于RecursivelyMergeAsyncreturns一个Task<IPointCloud>可以等待,所以我修改了代码使用async/await

public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
    CancellationToken token)
{
    return Observable.Create<IPointCloud>(
        observer =>
        {
            PairCollectionProducer(dataDirectory, token)
                .Subscribe(async (pairCollection) =>
                {
                    observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
                        pairCollection, token));
                },
                onCompleted: () =>
                {
                    observer.OnCompleted();
                });
            return () => { };
        });
}

但现在它会立即 returns 并且控制台应用程序会关闭。我确信这可以在不需要 Semephores 的情况下完成,但我是 RX 的新手。我如何为每个返回的 pairCollection 同时将 RecursivelyMergeAsync 配置为 运行 而不会阻塞并在所有递归合并完成时收到通知?

注意。在单元测试中,我执行以下操作

public class IcpBatchSolverServiceTests
{
    private Mock<ISettingsProvider> _mockSettingsProvider; 
    private IIcpBatchSolverService _icpBatchSolverService;

    [OneTimeSetUp]
    public void Setup()
    {
        _mockSettingsProvider = new Mock<ISettingsProvider>();

        _mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
        _mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;

        Log.Logger = new LoggerConfiguration()
            .WriteTo.Console()
            .CreateLogger();

        var serviceProvider = new ServiceCollection()
            .AddLogging(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Trace);
                builder.AddSerilog(Log.Logger);
            })
            .BuildServiceProvider();

        ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
            .GetService<ILoggerFactory>()
            .CreateLogger<IcpBatchSolverServiceTests>();

        _icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
    }

    [Test]
    public async Task CanSolveBatchAsync()
    {
        IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
        List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);

        IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
        IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);

        Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
    }
}

并且这个过程完美地同时进行。


编辑。概述当为具有命名约定的不同几何图形(不同角度的不同几何图形的深度图)提供文件文件夹时,我需要执行哪些处理。NNNN.exr 其中 NNNN 是某个数值。对于一批文件。

  1. 使用不同几何形状的文件名将这些文件分批放入集合中。

foreach 文件批次

  1. [*Serial*] 调用 C++ API 从图像文件中提取深度图。
  2. [*并行*] 将 DepthMaps 转换为 PointClouds。这可以一次完成。
  3. [*并行*] 使用 ICP 算法合并 PointClouds(昂贵)但使用 TaskScheduler 将并发限制为两个线程(根据机器 architecture/memory 等选择)

最后,我使用步骤 3 中的合并点云再次调用 C++ API。所以在 RX 中,我当前的完整管道看起来像

public class SolverManagementService : ISolverManagementService
{
    private readonly IIcpBatchSolverService _icpBatchSolverService;
    private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
    private readonly ILogger<SolverManagementService> _logger;

    public SolverManagementService(
        IIcpBatchSolverService icpBatchSolverService,
        IDepthMapToPointCloudAdapter pointCloudAdapter,
        ILogger<SolverManagementService> logger)
    {
        _icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
        _pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
        _logger = logger; 
    }

    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }

    public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<PairCollection<IPointCloud>>(
            observer =>
            {
                Parallel.ForEach(
                    Utils.GetFileBatches(dataDirectory), 
                    (fileBatch) =>
                {
                    var producer = RawDepthMapProducer(fileBatch, token);
                    ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();

                    producer.Subscribe(rawDepthMap =>
                    {
                        bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
                    }, 
                    onCompleted: () =>
                    {
                        PointCloudPartitioningService ps = new PointCloudPartitioningService();
                        observer.OnNext(ps.Partition(bag.ToList()));

                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
                            $"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
                    });
                });
                observer.OnCompleted();
                return () => { };
            });
    }

    public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
    {
        return Observable.Create<RawDepthMap>(
            observer =>
            {
                int index = 0;
                foreach(var filePath in filePaths)
                {
                    token.ThrowIfCancellationRequested();
                    var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);

                    observer.OnNext(extractor.GetDepthMap(filePath, index++));
                    _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
                }
                observer.OnCompleted();
                return () => { };
            });
    }
}

我正在寻找:1. 我上面的代码有什么问题请注意 _icpBatchSolverService.RecursivelyMergeAsync returns a Task<IPointCloud 并且是并发的,我想要这个拖车 运行同时。 2.我的代码还有什么问题?

我将留下一个通用的答案,因为上面的代码太广泛而无法将其简化。

有两种语法可用于定义异步行为。第一个是 async/await 模式,第二个是 Subscribe() 模式(反应式)。

异步和并发是一回事吗?

不,绝对不是。对于那些可能正在阅读这篇文章但不知道的人来说,异步意味着 "it happens later," 而不是 "it happens concurrently." 通过使用这两种语法中的任何一种,您可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理从 Web 服务器返回的响应。您需要发出请求,然后在响应返回时执行某些操作。

并发性不同。例如,您可以使用 Task.Run()Parallel.ForEach() 调用并发。在这两种情况下,您都在定义分叉。在 Task.Run 的情况下,您稍后可能会执行 Task.WaitAll。在 Parallel.ForEach 的情况下,它将为您执行 fork/join。当然,reactive 有自己的一套 fork/join 操作。

当我等待或订阅时会发生什么?

下面两行代码都有相同的行为,这种行为让很多程序员感到困惑:

var result = await myAsync();

myObservable.Subscribe(result => { ... });

在这两种情况下,程序的控制流都以一种可预测但可能令人困惑的方式移动。在第一种情况下,控制流 return 在等待 await 时返回给父调用者。在第二行中,控制流转到下一行代码,在结果的 return 上调用 lambda 表达式。

我在学习如何使用它们的人中看到的一个常见事情是尝试将 lambda 中的变量分配给父范围中的地址。这是行不通的,因为在执行 lambda 之前,该范围将不复存在。使用 async/await 不太可能做一些愚蠢的事情,但您还必须记住,控制流将上升到调用堆栈,直到定义下一个同步操作。 This article explains it in a little more depth, and this article 比较容易理解。