使 IObservable 订阅并发
Make an IObservable subscription concurrent
我有以下代码
string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
.Subscribe(PointCloudMergerCompleted);
其中 SolverManagementService _solverManagementService
是
Public class SolverManagementService : ISolverManagementService
{
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
... // Other methods.
}
但是这里 _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token)
很昂贵,虽然这个 returns 一个 Task<IPointCloud>
我没有线程化这个和这个调用块。由于RecursivelyMergeAsync
returns一个Task<IPointCloud>
可以等待,所以我修改了代码使用async/await
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(async (pairCollection) =>
{
observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
但现在它会立即 returns 并且控制台应用程序会关闭。我确信这可以在不需要 Semephores
的情况下完成,但我是 RX 的新手。我如何为每个返回的 pairCollection
同时将 RecursivelyMergeAsync
配置为 运行 而不会阻塞并在所有递归合并完成时收到通知?
注意。在单元测试中,我执行以下操作
public class IcpBatchSolverServiceTests
{
private Mock<ISettingsProvider> _mockSettingsProvider;
private IIcpBatchSolverService _icpBatchSolverService;
[OneTimeSetUp]
public void Setup()
{
_mockSettingsProvider = new Mock<ISettingsProvider>();
_mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
_mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddSerilog(Log.Logger);
})
.BuildServiceProvider();
ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
.GetService<ILoggerFactory>()
.CreateLogger<IcpBatchSolverServiceTests>();
_icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
}
[Test]
public async Task CanSolveBatchAsync()
{
IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);
IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);
Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
}
}
并且这个过程完美地同时进行。
编辑。概述当为具有命名约定的不同几何图形(不同角度的不同几何图形的深度图)提供文件文件夹时,我需要执行哪些处理。NNNN.exr 其中 NNNN 是某个数值。对于一批文件。
- 使用不同几何形状的文件名将这些文件分批放入集合中。
foreach 文件批次
- [*Serial*] 调用 C++ API 从图像文件中提取深度图。
- [*并行*] 将
DepthMaps
转换为 PointClouds
。这可以一次完成。
- [*并行*] 使用 ICP 算法合并 PointClouds(昂贵)但使用
TaskScheduler
将并发限制为两个线程(根据机器 architecture/memory 等选择)
最后,我使用步骤 3 中的合并点云再次调用 C++ API。所以在 RX 中,我当前的完整管道看起来像
public class SolverManagementService : ISolverManagementService
{
private readonly IIcpBatchSolverService _icpBatchSolverService;
private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
private readonly ILogger<SolverManagementService> _logger;
public SolverManagementService(
IIcpBatchSolverService icpBatchSolverService,
IDepthMapToPointCloudAdapter pointCloudAdapter,
ILogger<SolverManagementService> logger)
{
_icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
_pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
_logger = logger;
}
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<PairCollection<IPointCloud>>(
observer =>
{
Parallel.ForEach(
Utils.GetFileBatches(dataDirectory),
(fileBatch) =>
{
var producer = RawDepthMapProducer(fileBatch, token);
ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();
producer.Subscribe(rawDepthMap =>
{
bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
},
onCompleted: () =>
{
PointCloudPartitioningService ps = new PointCloudPartitioningService();
observer.OnNext(ps.Partition(bag.ToList()));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
$"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
});
});
observer.OnCompleted();
return () => { };
});
}
public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
{
return Observable.Create<RawDepthMap>(
observer =>
{
int index = 0;
foreach(var filePath in filePaths)
{
token.ThrowIfCancellationRequested();
var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);
observer.OnNext(extractor.GetDepthMap(filePath, index++));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
}
observer.OnCompleted();
return () => { };
});
}
}
我正在寻找:1. 我上面的代码有什么问题请注意 _icpBatchSolverService.RecursivelyMergeAsync
returns a Task<IPointCloud
并且是并发的,我想要这个拖车 运行同时。 2.我的代码还有什么问题?
我将留下一个通用的答案,因为上面的代码太广泛而无法将其简化。
有两种语法可用于定义异步行为。第一个是 async/await
模式,第二个是 Subscribe()
模式(反应式)。
异步和并发是一回事吗?
不,绝对不是。对于那些可能正在阅读这篇文章但不知道的人来说,异步意味着 "it happens later," 而不是 "it happens concurrently." 通过使用这两种语法中的任何一种,您可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理从 Web 服务器返回的响应。您需要发出请求,然后在响应返回时执行某些操作。
并发性不同。例如,您可以使用 Task.Run()
或 Parallel.ForEach()
调用并发。在这两种情况下,您都在定义分叉。在 Task.Run
的情况下,您稍后可能会执行 Task.WaitAll
。在 Parallel.ForEach
的情况下,它将为您执行 fork/join。当然,reactive 有自己的一套 fork/join 操作。
当我等待或订阅时会发生什么?
下面两行代码都有相同的行为,这种行为让很多程序员感到困惑:
var result = await myAsync();
myObservable.Subscribe(result => { ... });
在这两种情况下,程序的控制流都以一种可预测但可能令人困惑的方式移动。在第一种情况下,控制流 return 在等待 await
时返回给父调用者。在第二行中,控制流转到下一行代码,在结果的 return 上调用 lambda 表达式。
我在学习如何使用它们的人中看到的一个常见事情是尝试将 lambda 中的变量分配给父范围中的地址。这是行不通的,因为在执行 lambda 之前,该范围将不复存在。使用 async/await
不太可能做一些愚蠢的事情,但您还必须记住,控制流将上升到调用堆栈,直到定义下一个同步操作。 This article explains it in a little more depth, and this article 比较容易理解。
我有以下代码
string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
.Subscribe(PointCloudMergerCompleted);
其中 SolverManagementService _solverManagementService
是
Public class SolverManagementService : ISolverManagementService
{
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
... // Other methods.
}
但是这里 _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token)
很昂贵,虽然这个 returns 一个 Task<IPointCloud>
我没有线程化这个和这个调用块。由于RecursivelyMergeAsync
returns一个Task<IPointCloud>
可以等待,所以我修改了代码使用async/await
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(async (pairCollection) =>
{
observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
但现在它会立即 returns 并且控制台应用程序会关闭。我确信这可以在不需要 Semephores
的情况下完成,但我是 RX 的新手。我如何为每个返回的 pairCollection
同时将 RecursivelyMergeAsync
配置为 运行 而不会阻塞并在所有递归合并完成时收到通知?
注意。在单元测试中,我执行以下操作
public class IcpBatchSolverServiceTests
{
private Mock<ISettingsProvider> _mockSettingsProvider;
private IIcpBatchSolverService _icpBatchSolverService;
[OneTimeSetUp]
public void Setup()
{
_mockSettingsProvider = new Mock<ISettingsProvider>();
_mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
_mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddSerilog(Log.Logger);
})
.BuildServiceProvider();
ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
.GetService<ILoggerFactory>()
.CreateLogger<IcpBatchSolverServiceTests>();
_icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
}
[Test]
public async Task CanSolveBatchAsync()
{
IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);
IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);
Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
}
}
并且这个过程完美地同时进行。
编辑。概述当为具有命名约定的不同几何图形(不同角度的不同几何图形的深度图)提供文件文件夹时,我需要执行哪些处理。NNNN.exr 其中 NNNN 是某个数值。对于一批文件。
- 使用不同几何形状的文件名将这些文件分批放入集合中。
foreach 文件批次
- [*Serial*] 调用 C++ API 从图像文件中提取深度图。
- [*并行*] 将
DepthMaps
转换为PointClouds
。这可以一次完成。 - [*并行*] 使用 ICP 算法合并 PointClouds(昂贵)但使用
TaskScheduler
将并发限制为两个线程(根据机器 architecture/memory 等选择)
最后,我使用步骤 3 中的合并点云再次调用 C++ API。所以在 RX 中,我当前的完整管道看起来像
public class SolverManagementService : ISolverManagementService
{
private readonly IIcpBatchSolverService _icpBatchSolverService;
private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
private readonly ILogger<SolverManagementService> _logger;
public SolverManagementService(
IIcpBatchSolverService icpBatchSolverService,
IDepthMapToPointCloudAdapter pointCloudAdapter,
ILogger<SolverManagementService> logger)
{
_icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
_pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
_logger = logger;
}
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<PairCollection<IPointCloud>>(
observer =>
{
Parallel.ForEach(
Utils.GetFileBatches(dataDirectory),
(fileBatch) =>
{
var producer = RawDepthMapProducer(fileBatch, token);
ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();
producer.Subscribe(rawDepthMap =>
{
bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
},
onCompleted: () =>
{
PointCloudPartitioningService ps = new PointCloudPartitioningService();
observer.OnNext(ps.Partition(bag.ToList()));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
$"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
});
});
observer.OnCompleted();
return () => { };
});
}
public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
{
return Observable.Create<RawDepthMap>(
observer =>
{
int index = 0;
foreach(var filePath in filePaths)
{
token.ThrowIfCancellationRequested();
var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);
observer.OnNext(extractor.GetDepthMap(filePath, index++));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
}
observer.OnCompleted();
return () => { };
});
}
}
我正在寻找:1. 我上面的代码有什么问题请注意 _icpBatchSolverService.RecursivelyMergeAsync
returns a Task<IPointCloud
并且是并发的,我想要这个拖车 运行同时。 2.我的代码还有什么问题?
我将留下一个通用的答案,因为上面的代码太广泛而无法将其简化。
有两种语法可用于定义异步行为。第一个是 async/await
模式,第二个是 Subscribe()
模式(反应式)。
异步和并发是一回事吗?
不,绝对不是。对于那些可能正在阅读这篇文章但不知道的人来说,异步意味着 "it happens later," 而不是 "it happens concurrently." 通过使用这两种语法中的任何一种,您可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理从 Web 服务器返回的响应。您需要发出请求,然后在响应返回时执行某些操作。
并发性不同。例如,您可以使用 Task.Run()
或 Parallel.ForEach()
调用并发。在这两种情况下,您都在定义分叉。在 Task.Run
的情况下,您稍后可能会执行 Task.WaitAll
。在 Parallel.ForEach
的情况下,它将为您执行 fork/join。当然,reactive 有自己的一套 fork/join 操作。
当我等待或订阅时会发生什么?
下面两行代码都有相同的行为,这种行为让很多程序员感到困惑:
var result = await myAsync();
myObservable.Subscribe(result => { ... });
在这两种情况下,程序的控制流都以一种可预测但可能令人困惑的方式移动。在第一种情况下,控制流 return 在等待 await
时返回给父调用者。在第二行中,控制流转到下一行代码,在结果的 return 上调用 lambda 表达式。
我在学习如何使用它们的人中看到的一个常见事情是尝试将 lambda 中的变量分配给父范围中的地址。这是行不通的,因为在执行 lambda 之前,该范围将不复存在。使用 async/await
不太可能做一些愚蠢的事情,但您还必须记住,控制流将上升到调用堆栈,直到定义下一个同步操作。 This article explains it in a little more depth, and this article 比较容易理解。