读取 \yyyy\mm\dd\HH\MM\ 范围内的文件
Reading files in \yyyy\mm\dd\HH\MM\ ranges
我有一个 PySpark 应用程序需要从 Azure blob 存储帐户读取文件,其中文件每 5 分钟按以下格式分区到文件夹中:
\Root\yyyy\mm\dd\HH\MM\files.csv
我有一个每小时 运行 的进程,我想处理自上次 运行 以来的所有文件(如果 运行 被遗漏,可能会超过一个小时).我管理一个高水位线,它告诉我上次处理文件夹的时间。
文件中还有一个日期时间字段,它与路径日期时间相匹配(第二个细节更详细)。
请注意,我无法将文件夹结构更改为年=yyyy\month=mm 等的 Sparks 首选分区方法。
我写过这个函数:
from datetime import datetime
def folderDateTimeRange(startDateTime, endDateTime, levels=5):
if startDateTime.year != endDateTime.year:
return '/{*}' * levels
elif startDateTime.month != endDateTime.month:
return datetime.strftime(startDateTime, '%Y') + '/{*}' * (levels - 1)
elif startDateTime.day != endDateTime.day:
return datetime.strftime(startDateTime, '%Y/%m') + '/{*}' * (levels - 2)
elif startDateTime.hour != endDateTime.hour:
return datetime.strftime(startDateTime, '%Y/%m/%d') + '/{*}' * (levels - 3)
else:
return ""
这限制了大多数情况下读取的文件夹数量。我仍然需要过滤数据由传递给函数的相同开始和结束时间读取,因为 23:00 到 01:00 第二天会 return {*} 并且小时部分 - 因此我认为这可能更有效。
在最坏的例子中,你传入 start = 2018-12-31 22:00:00 和 end = 2019-01-01 01:00:00 - 这会导致读取所有年份的所有数据.
我对 glob 的了解有限 - 但是否可以传递 运行ge 而不是 {*}?
是的,您可以使用花括号 return 项目列表,或者您可以使用正则表达式。
在此处查看: and here: pyspark select subset of files using regex/glob from s3(我不确定 Azure 和 S3 有多少不同,但我的假设是 PySpark 可以将其抽象化;如果我错了请纠正我。)
您还可以通过生成几条路径并发送它们而不是仅发送一条路径来最大程度地减少 'waste' 读取文件的次数(这确保您不会遇到读取两年数据的相同陷阱,如果当你从一年跨越到下一年。)
为了好玩,我在底部编写了一些带有一些测试内容的小代码,您可能可以 return 这些列表并获得您想要的内容:
from datetime import datetime as dt
from datetime import timedelta
from collections import defaultdict
# \Root\yyyy\mm\dd\HH\MM\files.csv
def folderDateTimeRange(start, end, levels=5):
start_iter = start
paths = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
while start_iter < end:
paths[start_iter.year][start_iter.month][start_iter.day][start_iter.hour].append(start_iter.minute)
start_iter += timedelta(minutes=5)
ret_paths = []
for year, v1 in paths.items():
path = '{}\'.format(year)
for month, v2 in v1.items():
path += '{}\'.format(month)
for day, v3 in v2.items():
path += '{}\'.format(day)
path += '{{{}}}\{{*}}'.format(','.join([str(_) for _ in v3.keys()]))
ret_paths.append(path)
return ret_paths
def test(a, b):
res = folderDateTimeRange(a, b)
for r in res:
print(r)
print('---')
test(dt(2018, 1, 1), dt(2018, 1, 2))
test(dt(2018, 12, 31), dt(2019, 1, 2))
我有一个 PySpark 应用程序需要从 Azure blob 存储帐户读取文件,其中文件每 5 分钟按以下格式分区到文件夹中:
\Root\yyyy\mm\dd\HH\MM\files.csv
我有一个每小时 运行 的进程,我想处理自上次 运行 以来的所有文件(如果 运行 被遗漏,可能会超过一个小时).我管理一个高水位线,它告诉我上次处理文件夹的时间。
文件中还有一个日期时间字段,它与路径日期时间相匹配(第二个细节更详细)。
请注意,我无法将文件夹结构更改为年=yyyy\month=mm 等的 Sparks 首选分区方法。
我写过这个函数:
from datetime import datetime
def folderDateTimeRange(startDateTime, endDateTime, levels=5):
if startDateTime.year != endDateTime.year:
return '/{*}' * levels
elif startDateTime.month != endDateTime.month:
return datetime.strftime(startDateTime, '%Y') + '/{*}' * (levels - 1)
elif startDateTime.day != endDateTime.day:
return datetime.strftime(startDateTime, '%Y/%m') + '/{*}' * (levels - 2)
elif startDateTime.hour != endDateTime.hour:
return datetime.strftime(startDateTime, '%Y/%m/%d') + '/{*}' * (levels - 3)
else:
return ""
这限制了大多数情况下读取的文件夹数量。我仍然需要过滤数据由传递给函数的相同开始和结束时间读取,因为 23:00 到 01:00 第二天会 return {*} 并且小时部分 - 因此我认为这可能更有效。
在最坏的例子中,你传入 start = 2018-12-31 22:00:00 和 end = 2019-01-01 01:00:00 - 这会导致读取所有年份的所有数据.
我对 glob 的了解有限 - 但是否可以传递 运行ge 而不是 {*}?
是的,您可以使用花括号 return 项目列表,或者您可以使用正则表达式。
在此处查看:
您还可以通过生成几条路径并发送它们而不是仅发送一条路径来最大程度地减少 'waste' 读取文件的次数(这确保您不会遇到读取两年数据的相同陷阱,如果当你从一年跨越到下一年。)
为了好玩,我在底部编写了一些带有一些测试内容的小代码,您可能可以 return 这些列表并获得您想要的内容:
from datetime import datetime as dt
from datetime import timedelta
from collections import defaultdict
# \Root\yyyy\mm\dd\HH\MM\files.csv
def folderDateTimeRange(start, end, levels=5):
start_iter = start
paths = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
while start_iter < end:
paths[start_iter.year][start_iter.month][start_iter.day][start_iter.hour].append(start_iter.minute)
start_iter += timedelta(minutes=5)
ret_paths = []
for year, v1 in paths.items():
path = '{}\'.format(year)
for month, v2 in v1.items():
path += '{}\'.format(month)
for day, v3 in v2.items():
path += '{}\'.format(day)
path += '{{{}}}\{{*}}'.format(','.join([str(_) for _ in v3.keys()]))
ret_paths.append(path)
return ret_paths
def test(a, b):
res = folderDateTimeRange(a, b)
for r in res:
print(r)
print('---')
test(dt(2018, 1, 1), dt(2018, 1, 2))
test(dt(2018, 12, 31), dt(2019, 1, 2))