Observable.Using 和异步流获取损坏的数据
Observable.Using and async streams getting corrupted data
我有一个流,其目标是计算一组 .zip 文件中内容的简单 "checksum"。
为此,我设置了一个可观察对象:
- 获取给定文件夹中的所有文件
- 读取每个文件的内容(读取为
ZipArchive
)
- 对于每个文件中的每个条目,执行校验和
的计算
为了说明这一点,我创建了这个例子:
注意使用 AsyncContext.Run
() 使 Main
方法等待 GetChecksum
因为它是一个控制台申请
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
运行应用程序,出现各种错误:
'System.IO.InvalidDataException': A local file header is corrupt.
'System.NotSupportedException': Stream does not support reading.
'System.ObjectDisposedException' : Cannot access a disposed object.
'System.IO.InvalidDataException' : Block length does not match with its complement.
我做错了什么?
Observable 本身有问题还是因为 ZipArchive
不是线程安全的?如果不是,我该如何使代码工作?
您的问题似乎与"Rx"无关。
如果你 mod 将整个事情都变成一组命令式循环,它就可以正常工作
private static async Task<IList<byte>> GetChecksums()
{
var bytes = new List<byte>();
foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
{
using (var archive = CreateZipArchive(path))
{
foreach (var entry in archive.Entries)
{
using (var stream = entry.Open())
{
var checksum = await CalculateChecksum(stream, entry.Length);
bytes.Add(checksum);
}
}
}
}
return bytes;
}
所以我想你有一组竞争条件(并发)and/or乱序处理问题。
Rx 可能不是最适合这个的。老实说,你甚至可以在没有异步的情况下做到这一点。
Directory.EnumerateFiles("FolderWithZips")
.AsParallel()
.Select(folder => CalculateChecksum(folder))
.ToList()
我有一个流,其目标是计算一组 .zip 文件中内容的简单 "checksum"。
为此,我设置了一个可观察对象:
- 获取给定文件夹中的所有文件
- 读取每个文件的内容(读取为
ZipArchive
) - 对于每个文件中的每个条目,执行校验和 的计算
为了说明这一点,我创建了这个例子:
注意使用 AsyncContext.Run
() 使 Main
方法等待 GetChecksum
因为它是一个控制台申请
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
运行应用程序,出现各种错误:
'System.IO.InvalidDataException': A local file header is corrupt. 'System.NotSupportedException': Stream does not support reading. 'System.ObjectDisposedException' : Cannot access a disposed object. 'System.IO.InvalidDataException' : Block length does not match with its complement.
我做错了什么?
Observable 本身有问题还是因为 ZipArchive
不是线程安全的?如果不是,我该如何使代码工作?
您的问题似乎与"Rx"无关。
如果你 mod 将整个事情都变成一组命令式循环,它就可以正常工作
private static async Task<IList<byte>> GetChecksums()
{
var bytes = new List<byte>();
foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
{
using (var archive = CreateZipArchive(path))
{
foreach (var entry in archive.Entries)
{
using (var stream = entry.Open())
{
var checksum = await CalculateChecksum(stream, entry.Length);
bytes.Add(checksum);
}
}
}
}
return bytes;
}
所以我想你有一组竞争条件(并发)and/or乱序处理问题。
Rx 可能不是最适合这个的。老实说,你甚至可以在没有异步的情况下做到这一点。
Directory.EnumerateFiles("FolderWithZips")
.AsParallel()
.Select(folder => CalculateChecksum(folder))
.ToList()