Azure Data Lake:将数据从 Blob 移动到 ADLS 时遇到问题
Azure Data Lake : Facing issue while moving data from Blob to ADLS
我正在用 C# 创建一个 Azure 函数,它执行以下操作:
- 从 blob 中提取压缩文件,
- 解压缩并将其复制到 Azure Data Lake Store。
我能够使用 UploadFromStreamAsync(stream) 函数解压缩文件并将其上传到另一个 blob。
但是,我在为 ADLS 做同样的事情时遇到了问题
我参考了下面的 link 并尝试先使用 adlsFileSystemClient.FileSystem.Create
创建文件,然后在数据湖中使用 adlsFileSystemClient.FileSystem.Append
追加流,但它做到了不行。
- create 方法创建一个 零字节 文件,但追加什么也不做,azure 函数仍然成功完成,没有任何错误。另外,尝试使用 adlsFileSystemClient.FileSystem.AppendAsync
仍然是同样的问题。
代码:
// Save blob(zip file) contents to a Memory Stream.
using (var zipBlobFileStream = new MemoryStream())
{
await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
await zipBlobFileStream.FlushAsync();
zipBlobFileStream.Position = 0;
//use ZipArchive from System.IO.Compression to extract all the files from zip file
using (var zip = new ZipArchive(zipBlobFileStream))
{
//Each entry here represents an individual file or a folder
foreach (var entry in zip.Entries)
{
string destfilename = $"{destcontanierPath2}/"+entry.FullName;
log.Info($"DestFilename: {destfilename}");
//creating an empty file (blobkBlob) for the actual file with the same name of file
var blob = extractcontainer.GetBlockBlobReference($"{destfilename}");
using (var stream = entry.Open())
{
//check for file or folder and update the above blob reference with actual content from stream
if (entry.Length > 0)
{
await blob.UploadFromStreamAsync(stream);
//Creating a file and then append
adlsFileSystemClient.FileSystem.Create(_adlsAccountName, "/raw/Hello.txt",overwrite:true);
// Appending the stream to Azure Data Lake
using(var ms = new MemoryStream())
{
stream.CopyTo(ms);
ms.Position = 0; // rewind
log.Info($"**********MemoryStream: {ms}");
// do something with ms
await adlsFileSystemClient.FileSystem.AppendAsync(_adlsAccountName, "/raw/Hello.txt",ms,0);
}
}
}
}
}
}
新的临时解决方案:
using (var zipBlobFileStream = new MemoryStream())
{
await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
using (var zip = new ZipArchive(zipBlobFileStream))
{
//Each entry here represents an individual file or a folder
foreach (var entry in zip.Entries)
{
entry.ExtractToFile(directoryPath + entry.FullName, true);
//Upload the File to ADLS
var parameters = new UploadParameters(directoryPath + entry.FullName, "/raw/" + md5, _adlsAccountName, isOverwrite: true, maxSegmentLength: 268435456 * 2);
var frontend = new Microsoft.Azure.Management.DataLake.StoreUploader.DataLakeStoreFrontEndAdapter(_adlsAccountName, adlsFileSystemClient);
var uploader = new DataLakeStoreUploader(parameters, frontend);
uploader.Execute();
File.Delete(directoryPath + entry.FullName);
}
}
}
对于您的情况,您可以按如下方式更改代码,然后它应该可以工作。您应该从 foreach 子句中删除创建文件代码。
//Creating a file and then append
adlsFileSystemClient.FileSystem.Create(_adlsAccountName, "/raw/Hello.txt",overwrite:true);
using (var zipBlobFileStream = new MemoryStream())
{
await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
await zipBlobFileStream.FlushAsync();
zipBlobFileStream.Position = 0;
//use ZipArchive from System.IO.Compression to extract all the files from zip file
using (var zip = new ZipArchive(zipBlobFileStream))
{
//Each entry here represents an individual file or a folder
foreach (var entry in zip.Entries)
{
string destfilename = $"{destcontanierPath2}/"+entry.FullName;
log.Info($"DestFilename: {destfilename}");
//creating an empty file (blobkBlob) for the actual file with the same name of file
var blob = extractcontainer.GetBlockBlobReference($"{destfilename}");
using (var stream = entry.Open())
{
//check for file or folder and update the above blob reference with actual content from stream
if (entry.Length > 0)
{
using (MemoryStream ms = new MemoryStream())
{
stream.CopyTo(ms);
ms.Position = 0;
blob.UploadFromStream(ms);
ms.Position = 0;
adlsFileSystemClient.FileSystem.Append(adlsAccountName, "/raw/Hello.txt", ms);
}
}
}
}
}
}
}
我正在用 C# 创建一个 Azure 函数,它执行以下操作:
- 从 blob 中提取压缩文件,
- 解压缩并将其复制到 Azure Data Lake Store。
我能够使用 UploadFromStreamAsync(stream) 函数解压缩文件并将其上传到另一个 blob。
但是,我在为 ADLS 做同样的事情时遇到了问题
我参考了下面的 link adlsFileSystemClient.FileSystem.Create
创建文件,然后在数据湖中使用 adlsFileSystemClient.FileSystem.Append
追加流,但它做到了不行。
- create 方法创建一个 零字节 文件,但追加什么也不做,azure 函数仍然成功完成,没有任何错误。另外,尝试使用 adlsFileSystemClient.FileSystem.AppendAsync
仍然是同样的问题。
代码:
// Save blob(zip file) contents to a Memory Stream.
using (var zipBlobFileStream = new MemoryStream())
{
await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
await zipBlobFileStream.FlushAsync();
zipBlobFileStream.Position = 0;
//use ZipArchive from System.IO.Compression to extract all the files from zip file
using (var zip = new ZipArchive(zipBlobFileStream))
{
//Each entry here represents an individual file or a folder
foreach (var entry in zip.Entries)
{
string destfilename = $"{destcontanierPath2}/"+entry.FullName;
log.Info($"DestFilename: {destfilename}");
//creating an empty file (blobkBlob) for the actual file with the same name of file
var blob = extractcontainer.GetBlockBlobReference($"{destfilename}");
using (var stream = entry.Open())
{
//check for file or folder and update the above blob reference with actual content from stream
if (entry.Length > 0)
{
await blob.UploadFromStreamAsync(stream);
//Creating a file and then append
adlsFileSystemClient.FileSystem.Create(_adlsAccountName, "/raw/Hello.txt",overwrite:true);
// Appending the stream to Azure Data Lake
using(var ms = new MemoryStream())
{
stream.CopyTo(ms);
ms.Position = 0; // rewind
log.Info($"**********MemoryStream: {ms}");
// do something with ms
await adlsFileSystemClient.FileSystem.AppendAsync(_adlsAccountName, "/raw/Hello.txt",ms,0);
}
}
}
}
}
}
新的临时解决方案:
using (var zipBlobFileStream = new MemoryStream())
{
await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
using (var zip = new ZipArchive(zipBlobFileStream))
{
//Each entry here represents an individual file or a folder
foreach (var entry in zip.Entries)
{
entry.ExtractToFile(directoryPath + entry.FullName, true);
//Upload the File to ADLS
var parameters = new UploadParameters(directoryPath + entry.FullName, "/raw/" + md5, _adlsAccountName, isOverwrite: true, maxSegmentLength: 268435456 * 2);
var frontend = new Microsoft.Azure.Management.DataLake.StoreUploader.DataLakeStoreFrontEndAdapter(_adlsAccountName, adlsFileSystemClient);
var uploader = new DataLakeStoreUploader(parameters, frontend);
uploader.Execute();
File.Delete(directoryPath + entry.FullName);
}
}
}
对于您的情况,您可以按如下方式更改代码,然后它应该可以工作。您应该从 foreach 子句中删除创建文件代码。
//Creating a file and then append
adlsFileSystemClient.FileSystem.Create(_adlsAccountName, "/raw/Hello.txt",overwrite:true);
using (var zipBlobFileStream = new MemoryStream())
{
await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
await zipBlobFileStream.FlushAsync();
zipBlobFileStream.Position = 0;
//use ZipArchive from System.IO.Compression to extract all the files from zip file
using (var zip = new ZipArchive(zipBlobFileStream))
{
//Each entry here represents an individual file or a folder
foreach (var entry in zip.Entries)
{
string destfilename = $"{destcontanierPath2}/"+entry.FullName;
log.Info($"DestFilename: {destfilename}");
//creating an empty file (blobkBlob) for the actual file with the same name of file
var blob = extractcontainer.GetBlockBlobReference($"{destfilename}");
using (var stream = entry.Open())
{
//check for file or folder and update the above blob reference with actual content from stream
if (entry.Length > 0)
{
using (MemoryStream ms = new MemoryStream())
{
stream.CopyTo(ms);
ms.Position = 0;
blob.UploadFromStream(ms);
ms.Position = 0;
adlsFileSystemClient.FileSystem.Append(adlsAccountName, "/raw/Hello.txt", ms);
}
}
}
}
}
}
}