TransformBlock 发布到输出
TransformBlock posting to output
我的情况是我有一个 BufferBlock<Stream>
从外部源接收 Stream
,比方说文件系统或某个 FTP 服务器。这些文件 Stream
将传递到另一个块并进行处理。
唯一的问题是其中一些文件是压缩的,我想在中间添加一个 Block
以在必要时解压缩文件,并创建多个输出 Stream
's对于它的每个条目。
但是我不想使用 TransformBlockMany
,因为这意味着我必须完全接收 ZIP Stream
并立即创建输出 Stream
数组。
我希望此 Block
接收 ZIP Stream
,开始解压缩,并 Push
到下一个流,只要条目准备就绪,这样进程块就可以开始处理只要第一个文件解压,不要等到所有文件都解压。
我该怎么做?
我知道我的问题是无法同时使用 yield
/ async
。但在重构之后,我摆脱了这种需要,并提出了以下(简化)版本:
var block = new TransformManyBlock<Stream, Stream>((input) => {
var archive = new System.IO.Compression.ZipArchive(input, System.IO.Compression.ZipArchiveMode.Read, true);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (string.IsNullOrWhiteSpace(entry.Name)) //is a folder
continue;
yield return entry.Open();
}
});
您可以使用 predicate linking 块为您的解压缩逻辑设置中间块,这样您就可以检查是否流式传输存档,如下所示:
var buffer = new BufferBlock<Stream>();
var unzipper = new TransformManyBlock<Stream, Stream>(input => { /* unzip here */ });
var processBlock = new ActionBlock<Stream>(input => { /* process streams here */ });
buffer.LinkTo(unzipper, input => /* check is stream a zip archive */);
unzipper.LinkTo(processBlock);
buffer.LinkTo(processBlock);
至于 async
与 yield
一起使用,您可以尝试 GitHub and NuGet 上提供的 AsyncEnumerable
软件包。
我的情况是我有一个 BufferBlock<Stream>
从外部源接收 Stream
,比方说文件系统或某个 FTP 服务器。这些文件 Stream
将传递到另一个块并进行处理。
唯一的问题是其中一些文件是压缩的,我想在中间添加一个 Block
以在必要时解压缩文件,并创建多个输出 Stream
's对于它的每个条目。
但是我不想使用 TransformBlockMany
,因为这意味着我必须完全接收 ZIP Stream
并立即创建输出 Stream
数组。
我希望此 Block
接收 ZIP Stream
,开始解压缩,并 Push
到下一个流,只要条目准备就绪,这样进程块就可以开始处理只要第一个文件解压,不要等到所有文件都解压。
我该怎么做?
我知道我的问题是无法同时使用 yield
/ async
。但在重构之后,我摆脱了这种需要,并提出了以下(简化)版本:
var block = new TransformManyBlock<Stream, Stream>((input) => {
var archive = new System.IO.Compression.ZipArchive(input, System.IO.Compression.ZipArchiveMode.Read, true);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (string.IsNullOrWhiteSpace(entry.Name)) //is a folder
continue;
yield return entry.Open();
}
});
您可以使用 predicate linking 块为您的解压缩逻辑设置中间块,这样您就可以检查是否流式传输存档,如下所示:
var buffer = new BufferBlock<Stream>();
var unzipper = new TransformManyBlock<Stream, Stream>(input => { /* unzip here */ });
var processBlock = new ActionBlock<Stream>(input => { /* process streams here */ });
buffer.LinkTo(unzipper, input => /* check is stream a zip archive */);
unzipper.LinkTo(processBlock);
buffer.LinkTo(processBlock);
至于 async
与 yield
一起使用,您可以尝试 GitHub and NuGet 上提供的 AsyncEnumerable
软件包。