.NET 中的 AzureStorage 特定文件夹
AzureStorage specific folder in .NET
我正在为我的项目使用 ASP.NET,我需要计算 BlobStorage (Azure) 上特定文件夹中的文件总数。
我正在尝试计算此文件夹中的所有文件:container_one/year/month/day/hour
目前我尝试过的:
string path = "container_one/year/month/day/hour";
int totalItems = 0;
BlobContainerClient blobFolder = new BlobServiceClient(connectionString).GetBlobContainerClient(path);
foreach(var blob in blobFolder){
totalItems++;
}
Console.WriteLine(totalItems);
结果:
System.AggregateException: 'One or more errors occurred. (The requested URI does not represent any resource on the server.
我做错了什么?我该怎么做?我确信所提供的路径确实存在于服务器上。
----编辑--------
我犯了一个错误,我没有等待加载 bloblFolder。通过在 new BlobServiceClient
之前添加 await
关键字解决了该问题
所以最终的代码是:
string path = "container_one/year/month/day/hour";
int totalItems = 0;
BlobContainerClient blobFolder = await new BlobServiceClient(connectionString).GetBlobContainerClient(path);
foreach(var blob in blobFolder){
totalItems++;
}
Console.WriteLine(totalItems);
方法 .GetBlobContainerClient()
有很多重载,您尝试调用的方法需要传入一个 Uri
所以您应该做的是 .GetBlobContainerClient(new Uri(path))
.
我找到的工作方法:
int totalCount = 0;
string path = $"{container_one}/{year}/{month}/{day}/{hour}/";
var blobFolders = _blobContainerClient.GetBlobsAsync();
await foreach (var file in blobFolders)
{
if (file.Name.Contains(path))
{
totalCount++;
}
}
return totalCount;
备注
这样做会消耗大量性能,但我看不到任何其他选择。如果有人找到解决方案,请与我分享!
为了 并响应他们关于如何提高性能的请求,我将分享我的 Linqpad 文件中的一些代码,这些代码可以以我上次记录的速度迭代 Azure Blob 存储中的 blob 6 分 38 秒内有 670 万个斑点(或每分钟约 100 万个斑点,或每秒 16,666 个斑点 )。
像这样获得惊人的 blob 迭代速度是可行的,但在我的情况下,由于两件事,这是可能的:
- 我正在使用 Content-Addressable-Storage 实践,其中每个 blob 都是不可变的,并且 blob 的名称是其 SHA-256 哈希的 base-16 表示。
- 所以所有 blob-names 的统计分布被平均分配到 16 个 base16 字符“bins”
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
。
- 另一个问题是因为迭代 blob 有点 chatty 这意味着无论您的 Internet 连接的吞吐速度如何,您的计算机和 Blob 存储之间的网络延迟都会大大降低速度(因此,使用延迟为 1 毫秒的 100mbps 连接比使用延迟为 100 毫秒的 1Gbps 连接到您的 Azure 存储帐户要好得多)。
- 虽然解决方法很简单:只需在 Azure 中启动一个 VM(你只需要租用一个 VM 一个小时左右,这样你就可以安全地配置一个可能花费 1000 美元/月的强大机器,但是只会花费你不到 5 美元,因为你只需要它一个完整日历月的一小部分)。
正如我所提到的,我的 blob 名称均匀地(随机地)分布在大量“前缀箱”中这一事实是它能够如此快速地迭代它们的核心,但前提是您可以分区你的 blob-names 根据它们的前缀(包括容器名称和虚拟路径)放入大小均匀的容器中,然后它应该 运行 一样快。
- 将以下代码复制并粘贴到 Linqpad 脚本中。
- 您需要添加对
Azure.Storage.Blobs
的 NuGet 引用(它是针对 12.8.4
编写的,您可能需要更新并解决任何重大更改)。
- 此脚本将所有 blob URI(及其 Azure 生成的 MD5 哈希)写入二进制文件以供其他进程使用。
static readonly DirectoryInfo _outputDirectory = new DirectoryInfo( Path.Combine(
Environment.GetFolderPath( Environment.SpecialFolder.Desktop ),
"BlobsDump"
) );
async Task Main() {
_outputDirectory.Create();
// Indexes 8,730,343 blobs in 6m38 seconds (impressive - compared to the few hours it took earlier)
// UPDATE: Using 2-character prefixes brings total time down to 1m54s to enumerate 9m blobs, wow!
BlobContainerClient cc = CreateBlobClient();
await ListBlobsAsync( cc );
}
private static BlobContainerClient CreateBlobClient()
{
const String cs = @"DefaultEndpointsProtocol=https;AccountName=mindyourownbusiness;AccountKey=werenostrangerstoloveyouknowtherulesandsodoiafullcommitmentswhatimthinkingofyouwouldntgetthisfromanyotherguy;BlobEndpoint=https://.blob.core.windows.net/;TableEndpoint=https://.table.core.windows.net/;";
BlobServiceClient c = new BlobServiceClient(cs);
BlobContainerClient cc = c.GetBlobContainerClient("container-name");
return cc;
}
private static readonly Char[] _hexDigits = new[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
private static readonly IReadOnlyList<String> _blobNamePrefixes = _hexDigits.SelectMany( hd0 => _hexDigits.Select( hd1 => hd0.ToString() + hd1.ToString() ) ).ToList();
private static async Task ListBlobsAsync( BlobContainerClient cc )
{
ConcurrentDictionary<String,HashSet<String>> blobNamesPerPrefix = new ConcurrentDictionary<String,HashSet<String>>();
foreach( String prefix in _blobNamePrefixes )
{
blobNamesPerPrefix[prefix] = new HashSet<String>();
}
//
Task reportProgressTask = ReportProgressAsync( blobNamesPerPrefix );
List<Task> tasks = _blobNamePrefixes.Select( prefix => ListBlobsAsync( cc: cc, prefix: prefix, dict: blobNamesPerPrefix ) ).ToList();
await Task.WhenAll( tasks ).ConfigureAwait(false);
Int32 total = blobNamesPerPrefix.Values.Sum( s => s.Count );
}
const Double _lastKnownBlobCountApprox = 8925524; // As of 2020-09-07.
private static async Task ReportProgressAsync( ConcurrentDictionary<String,HashSet<String>> countsSoFar, CancellationToken ct = default )
{
var pb = new Util.ProgressBar( "Blobs indexed" );
pb.Dump();
while( true )
{
Int32 total = countsSoFar.Values.Sum( v => v.Count );
pb.Fraction = (Double)total / _lastKnownBlobCountApprox;
pb.Caption = ( $"{total:N0} blobs observed." );
await Task.Delay( 250 );
}
}
private static readonly UTF8Encoding _utf8NoBom = new UTF8Encoding( encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true );
private static async Task ListBlobsAsync( BlobContainerClient cc, String prefix, ConcurrentDictionary<String,HashSet<String>> dict )
{
HashSet<String> blobs = dict[prefix];
const Int32 ONE_MEGABYTE = 1 * 1024 * 1024;
String outputFileName = Path.Combine( _outputDirectory.FullName, prefix + ".dat" );
using( FileStream fs = new FileStream( outputFileName, FileMode.CreateNew, FileAccess.Write, FileShare.None, bufferSize: ONE_MEGABYTE, options: FileOptions.SequentialScan | FileOptions.Asynchronous ) )
using( MemoryStream ms = new MemoryStream( capacity: ONE_MEGABYTE ) )
using( BinaryWriter wtr = new BinaryWriter( ms, _utf8NoBom ) )
{
Stopwatch sw = Stopwatch.StartNew();
await ListBlobsInnerAsync( cc, prefix, blobs, wtr, fs, ms ).ConfigureAwait(false);
( $"Completed list with prefix \"{prefix}\". Blob count: {blobs.Count:N0}. Took {sw.ElapsedMilliseconds:N0}ms." ).Dump();
}
}
private static async Task ListBlobsInnerAsync( BlobContainerClient cc, String prefix, HashSet<String> blobs, BinaryWriter wtr, FileStream fs, MemoryStream ms )
{
Int32 i = 0;
String? continationToken = null;
do
{
System.Collections.Generic.IAsyncEnumerable<Azure.Page<BlobItem>> segment = cc.GetBlobsAsync( prefix: prefix ).AsPages( continationToken );
await foreach( Azure.Page<BlobItem>? page in segment.ConfigureAwait(false) )
{
continationToken = page.ContinuationToken;
if( page.Values.Last().Name[0] > prefix[0] ) break;
lock( blobs )
{
foreach( BlobItem bi in page.Values )
{
if( blobs.Add( bi.Name ) )
{
WriteBlobLine( ref i, bi, wtr );
}
}
}
// Flush:
wtr.Flush();
ms.Flush();
ms.Position = 0;
await ms.CopyToAsync( fs ).ConfigureAwait(false);
await fs.FlushAsync().ConfigureAwait(false);
ms.Position = 0;
ms.SetLength( 0 );
wtr.Seek( 0, SeekOrigin.Begin );
}
}
while( !String.IsNullOrWhiteSpace( continationToken ) );
}
private static void WriteBlobLine( ref Int32 i, BlobItem bi, BinaryWriter wtr )
{
wtr.Write( i );
wtr.Write( bi.Name ); // Length-prefixed string.
if( bi.Properties.ContentHash != null && bi.Properties.ContentHash.Length == 16 )
{
wtr.Write( bi.Properties.ContentHash.Length );
wtr.Write( bi.Properties.ContentHash );
}
else
{
wtr.Write( 0 );
}
i++;
}
我正在为我的项目使用 ASP.NET,我需要计算 BlobStorage (Azure) 上特定文件夹中的文件总数。
我正在尝试计算此文件夹中的所有文件:container_one/year/month/day/hour
目前我尝试过的:
string path = "container_one/year/month/day/hour";
int totalItems = 0;
BlobContainerClient blobFolder = new BlobServiceClient(connectionString).GetBlobContainerClient(path);
foreach(var blob in blobFolder){
totalItems++;
}
Console.WriteLine(totalItems);
结果:
System.AggregateException: 'One or more errors occurred. (The requested URI does not represent any resource on the server.
我做错了什么?我该怎么做?我确信所提供的路径确实存在于服务器上。
----编辑--------
我犯了一个错误,我没有等待加载 bloblFolder。通过在 new BlobServiceClient
await
关键字解决了该问题
所以最终的代码是:
string path = "container_one/year/month/day/hour";
int totalItems = 0;
BlobContainerClient blobFolder = await new BlobServiceClient(connectionString).GetBlobContainerClient(path);
foreach(var blob in blobFolder){
totalItems++;
}
Console.WriteLine(totalItems);
方法 .GetBlobContainerClient()
有很多重载,您尝试调用的方法需要传入一个 Uri
所以您应该做的是 .GetBlobContainerClient(new Uri(path))
.
我找到的工作方法:
int totalCount = 0;
string path = $"{container_one}/{year}/{month}/{day}/{hour}/";
var blobFolders = _blobContainerClient.GetBlobsAsync();
await foreach (var file in blobFolders)
{
if (file.Name.Contains(path))
{
totalCount++;
}
}
return totalCount;
备注
这样做会消耗大量性能,但我看不到任何其他选择。如果有人找到解决方案,请与我分享!
为了
像这样获得惊人的 blob 迭代速度是可行的,但在我的情况下,由于两件事,这是可能的:
- 我正在使用 Content-Addressable-Storage 实践,其中每个 blob 都是不可变的,并且 blob 的名称是其 SHA-256 哈希的 base-16 表示。
- 所以所有 blob-names 的统计分布被平均分配到 16 个 base16 字符“bins”
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
。
- 所以所有 blob-names 的统计分布被平均分配到 16 个 base16 字符“bins”
- 另一个问题是因为迭代 blob 有点 chatty 这意味着无论您的 Internet 连接的吞吐速度如何,您的计算机和 Blob 存储之间的网络延迟都会大大降低速度(因此,使用延迟为 1 毫秒的 100mbps 连接比使用延迟为 100 毫秒的 1Gbps 连接到您的 Azure 存储帐户要好得多)。
- 虽然解决方法很简单:只需在 Azure 中启动一个 VM(你只需要租用一个 VM 一个小时左右,这样你就可以安全地配置一个可能花费 1000 美元/月的强大机器,但是只会花费你不到 5 美元,因为你只需要它一个完整日历月的一小部分)。
正如我所提到的,我的 blob 名称均匀地(随机地)分布在大量“前缀箱”中这一事实是它能够如此快速地迭代它们的核心,但前提是您可以分区你的 blob-names 根据它们的前缀(包括容器名称和虚拟路径)放入大小均匀的容器中,然后它应该 运行 一样快。
- 将以下代码复制并粘贴到 Linqpad 脚本中。
- 您需要添加对
Azure.Storage.Blobs
的 NuGet 引用(它是针对12.8.4
编写的,您可能需要更新并解决任何重大更改)。 - 此脚本将所有 blob URI(及其 Azure 生成的 MD5 哈希)写入二进制文件以供其他进程使用。
static readonly DirectoryInfo _outputDirectory = new DirectoryInfo( Path.Combine(
Environment.GetFolderPath( Environment.SpecialFolder.Desktop ),
"BlobsDump"
) );
async Task Main() {
_outputDirectory.Create();
// Indexes 8,730,343 blobs in 6m38 seconds (impressive - compared to the few hours it took earlier)
// UPDATE: Using 2-character prefixes brings total time down to 1m54s to enumerate 9m blobs, wow!
BlobContainerClient cc = CreateBlobClient();
await ListBlobsAsync( cc );
}
private static BlobContainerClient CreateBlobClient()
{
const String cs = @"DefaultEndpointsProtocol=https;AccountName=mindyourownbusiness;AccountKey=werenostrangerstoloveyouknowtherulesandsodoiafullcommitmentswhatimthinkingofyouwouldntgetthisfromanyotherguy;BlobEndpoint=https://.blob.core.windows.net/;TableEndpoint=https://.table.core.windows.net/;";
BlobServiceClient c = new BlobServiceClient(cs);
BlobContainerClient cc = c.GetBlobContainerClient("container-name");
return cc;
}
private static readonly Char[] _hexDigits = new[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
private static readonly IReadOnlyList<String> _blobNamePrefixes = _hexDigits.SelectMany( hd0 => _hexDigits.Select( hd1 => hd0.ToString() + hd1.ToString() ) ).ToList();
private static async Task ListBlobsAsync( BlobContainerClient cc )
{
ConcurrentDictionary<String,HashSet<String>> blobNamesPerPrefix = new ConcurrentDictionary<String,HashSet<String>>();
foreach( String prefix in _blobNamePrefixes )
{
blobNamesPerPrefix[prefix] = new HashSet<String>();
}
//
Task reportProgressTask = ReportProgressAsync( blobNamesPerPrefix );
List<Task> tasks = _blobNamePrefixes.Select( prefix => ListBlobsAsync( cc: cc, prefix: prefix, dict: blobNamesPerPrefix ) ).ToList();
await Task.WhenAll( tasks ).ConfigureAwait(false);
Int32 total = blobNamesPerPrefix.Values.Sum( s => s.Count );
}
const Double _lastKnownBlobCountApprox = 8925524; // As of 2020-09-07.
private static async Task ReportProgressAsync( ConcurrentDictionary<String,HashSet<String>> countsSoFar, CancellationToken ct = default )
{
var pb = new Util.ProgressBar( "Blobs indexed" );
pb.Dump();
while( true )
{
Int32 total = countsSoFar.Values.Sum( v => v.Count );
pb.Fraction = (Double)total / _lastKnownBlobCountApprox;
pb.Caption = ( $"{total:N0} blobs observed." );
await Task.Delay( 250 );
}
}
private static readonly UTF8Encoding _utf8NoBom = new UTF8Encoding( encoderShouldEmitUTF8Identifier: false, throwOnInvalidBytes: true );
private static async Task ListBlobsAsync( BlobContainerClient cc, String prefix, ConcurrentDictionary<String,HashSet<String>> dict )
{
HashSet<String> blobs = dict[prefix];
const Int32 ONE_MEGABYTE = 1 * 1024 * 1024;
String outputFileName = Path.Combine( _outputDirectory.FullName, prefix + ".dat" );
using( FileStream fs = new FileStream( outputFileName, FileMode.CreateNew, FileAccess.Write, FileShare.None, bufferSize: ONE_MEGABYTE, options: FileOptions.SequentialScan | FileOptions.Asynchronous ) )
using( MemoryStream ms = new MemoryStream( capacity: ONE_MEGABYTE ) )
using( BinaryWriter wtr = new BinaryWriter( ms, _utf8NoBom ) )
{
Stopwatch sw = Stopwatch.StartNew();
await ListBlobsInnerAsync( cc, prefix, blobs, wtr, fs, ms ).ConfigureAwait(false);
( $"Completed list with prefix \"{prefix}\". Blob count: {blobs.Count:N0}. Took {sw.ElapsedMilliseconds:N0}ms." ).Dump();
}
}
private static async Task ListBlobsInnerAsync( BlobContainerClient cc, String prefix, HashSet<String> blobs, BinaryWriter wtr, FileStream fs, MemoryStream ms )
{
Int32 i = 0;
String? continationToken = null;
do
{
System.Collections.Generic.IAsyncEnumerable<Azure.Page<BlobItem>> segment = cc.GetBlobsAsync( prefix: prefix ).AsPages( continationToken );
await foreach( Azure.Page<BlobItem>? page in segment.ConfigureAwait(false) )
{
continationToken = page.ContinuationToken;
if( page.Values.Last().Name[0] > prefix[0] ) break;
lock( blobs )
{
foreach( BlobItem bi in page.Values )
{
if( blobs.Add( bi.Name ) )
{
WriteBlobLine( ref i, bi, wtr );
}
}
}
// Flush:
wtr.Flush();
ms.Flush();
ms.Position = 0;
await ms.CopyToAsync( fs ).ConfigureAwait(false);
await fs.FlushAsync().ConfigureAwait(false);
ms.Position = 0;
ms.SetLength( 0 );
wtr.Seek( 0, SeekOrigin.Begin );
}
}
while( !String.IsNullOrWhiteSpace( continationToken ) );
}
private static void WriteBlobLine( ref Int32 i, BlobItem bi, BinaryWriter wtr )
{
wtr.Write( i );
wtr.Write( bi.Name ); // Length-prefixed string.
if( bi.Properties.ContentHash != null && bi.Properties.ContentHash.Length == 16 )
{
wtr.Write( bi.Properties.ContentHash.Length );
wtr.Write( bi.Properties.ContentHash );
}
else
{
wtr.Write( 0 );
}
i++;
}