是否可以使用 azure python sdk 根据上次修改时间过滤 azure data lake 文件?

Can azure data lake files be filtered based on Last Modified time using azure python sdk?

我正在尝试对存储在 azure datalake 中的文件执行内存操作。我无法在不使用 ADL 下载程序的情况下找到有关使用匹配模式的文档。

对于单个文件,这是我使用的代码

filename = '/<folder/<filename>.json'
with adlsFileSystemClient.open(filename) as f:
    for line in f:
         <file-operations>

但是我们如何根据文件名(字符串匹配)或最后修改日期进行过滤。

当我使用 U-SQL 时,我可以选择根据最后修改的选项过滤文件集。

DECLARE EXTERNAL @TodaysTime = DateTime.UtcNow.AddDays(-1);

@rawInput=
    EXTRACT jsonString string,
            uri = FILE.URI()
            ,modified_date = FILE.MODIFIED()
    FROM @in
    USING Extractors.Tsv(quoting : true);



@parsedInput=
    SELECT *
    FROM @rawInput
    WHERE modified_date > @TodaysTime;

在使用adlsFileSystemClient时,是否有类似的选项来过滤指定时间段内修改的文件?

Github 问题:https://github.com/Azure/azure-data-lake-store-python/issues/300

感谢任何帮助。

注:

下面akharit in GitHub recently. I am providing his answer回答了这个问题,解决了我的要求。

**adls sdk 本身没有任何构建功能,因为没有服务器端 api 将 return 仅在过去 4 小时内修改过的文件。 在获得所有条目的列表后,编写代码来执行此操作应该很容易。 自unix纪元以来的修改时间字段returns毫秒,您可以通过

将其转换为python datetime对象
from datetime import datetime, timedelta
 datetime.fromtimestamp(file['modificationTime'] / 1000)

然后是

    filtered = [file['name'] for file in adl.ls('/', detail=True) if (datetime.now() - datetime.fromtimestamp(file['modificationTime']/1000)) > timedelta(hours = 4)]

您也可以使用 walk 代替 ls 进行递归枚举。

**

基于下面的代码,您可以找到包含 last_modified 数据的文件属性的容器级目录和文件名。所以你可以根据 last_modified 日期来控制文件。

from pyspark.sql.functions import col
from azure.storage.blob import BlockBlobService
from datetime import datetime
block_blob_service = BlockBlobService(account_name='acccount_name', account_key='account-key')
container_name ='Contaniner_name'
second_conatainer_name ='Contaniner_name_second'
#block_blob_service.create_container(container_name)
generator = block_blob_service.list_blobs(container_name,prefix="Recovery/")
report_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

myfile = open('/dbfs/adlsaudit/auditfiles2', 'w')
for blob in generator:
    length = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
    last_modified = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.last_modified
    file_size = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
  #  print("\t Recovery: " + blob.name,":" +str(length),":" + str(last_modified))
    line = container_name+'|'+second_conatainer_name+'|'+blob.name+'|'+ str(file_size) +'|'+str(last_modified)+'|'+str(report_time)
    myfile.write(line+'\n')
myfile.close()