仅在完全完成数据采集后才读取 hdf5 文件
Reading an hdf5 file only after it has completely finished acquiring data
数据将保存为hdf5文件,但保存一个文件总共需要大约30秒。一旦数据保存在一个 hdf5 文件中,该文件将立即使用,直到下一个 hdf5 文件完成,该过程将像这样继续。有没有一种简单的方法来检查 hd5 文件是否已完成加载,然后才能使用? hdf5 文件大约为 10-20MB,将全部保存在同一个文件夹中。当然,我也许可以将计时器设置为某种形式的 30 秒以上,但我有兴趣将时间保持得尽可能短,这意味着我需要确切地知道每个 hdf5 文件何时完成获取数据。
我有几个想法:
- 测量从一个时间点到另一个时间点的文件大小差异。如果没有变化,则认为文件已完成加载。
- 我不太了解 hdf5 文件,但也许每个 hdf5 文件的末尾都有一些东西,而且只有在末尾。如果是这样,我可以继续检查最后一个组件的值是否存在。如果存在,则文件必须完成。
有什么想法吗?如果有任何帮助,我将不胜感激。
编辑:
我对 hdf5 部分的想法 on_created
:
class CustomHandler(FileSystemEventHandler):
def __init__(self, callback: Callable):
self.callback = callback
# Store callback to be called on every on_created event
def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
#print(f"Event type: {event.event_type}\nAt: {event.src_path}\n")
# check if it's File creation, not Directory creation
if isinstance(event, FileCreatedEvent):
file = pathlib.Path(event.src_path)
#print(f"Processing file {file.name}\n")
# call callback
#self.callback(file)
wait = 3
max_wait = 30
waited = 0
while True:
try:
h5py.File(self.callback(file), 'r')
return self.callback(file)
except FileNotFoundError:
print('Error: HDF5 File not found')
return None
except OSError:
if waited < max_wait:
print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
time.sleep(wait)
waited += wait
else:
print(f'waited too long= {waited} secs')
return None
你要的是'file locking'。好消息:这在 HDF5 库构建中启用(默认情况下)。而且,更好的是,它在 h5py 包中启用!因此,如果您尝试打开一个由另一个程序打开以供写入的文件,您将得到一个异常。我们可以利用这个例外来发挥我们的优势。挑战在于将文件锁定异常与其他潜在的文件打开异常(如文件不存在)区分开来。
坦率地说,我更喜欢 Python 的 with/as:
上下文管理器来打开文件。但是,它以相同的方式处理所有异常(不打开并退出)。所以,我们需要一种方法来区别处理不同的异常。我怀疑自定义文件上下文管理器是执行此操作的最 Pythonic 方法。然而,这超出了我的专业范围。
相反,我写了一个你用文件名调用的函数它在 while
循环中使用 try/except:
打开文件。将发生以下 3 件事之一:
- 它 returns h5py 文件对象,如果它打开文件。
- 它立即returns
None
如果文件不存在。
- 如果它被锁定,它会休眠,然后重试。如果超过时间还打不开,返回None。
使用此功能时记得使用.close()
方法!
代码已于 2021-09-09 更新,以使用 argparse
模块将 HDF5 文件名作为必需的命令行参数传递。
更新了以下代码:
import h5py
import argparse
import sys
import time
def h5_open_wait(h5file):
wait = 3
max_wait = 30
waited = 0
while True:
try:
h5f = h5py.File(h5file,'r')
return h5f
except FileNotFoundError:
print('Error: HDF5 File not found')
return None
except OSError:
if waited < max_wait:
print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
time.sleep(wait)
waited += wait
else:
print(f'waited too long= {waited} secs')
return None
def get_job_options():
# Note that HDF5 file name is only parameter and is required;
parser = argparse.ArgumentParser(description='Check HDF5 file is available to open.')
parser.add_argument('hdf5', help='HDF5 filename (Required)' )
if len(sys.argv)==1:
# display help message when no args are passed.
parser.print_help()
sys.exit('Error: No HDF5 file name specified; exiting.')
args = parser.parse_args()
HDF5_FILE = args.hdf5
#print ('HDF5 file = %s' % args.hdf5)
return (HDF5_FILE)
####################
h5file = get_job_options()
start = time.time()
h5f = h5_open_wait(h5file)
if h5f is None:
sys.exit('Error: HDF5 File not opened')
# do something with the file
for ds, obj in h5f.items():
print(f'ds name={ds}; shape={obj.shape}')
h5f.close()
print(f'\nTime to read all datasets = {time.time()-start:.2f} secs')
为了测试,我写了一个简单的程序,从一个大数组中创建 800 个数据集。 (下面的代码。)要测试,首先启动它,然后 运行 上面的代码以查看它是如何等待的。根据您的系统速度调整上面的 max_wait
和下面的 a0
和 cnt
。
创建上面使用的示例文件的代码:
start = time.time()
a0 = 1000
cnt = 800
arr = np.random.random(a0*a0).reshape(a0,a0)
with h5py.File('SO_69067142.h5','w') as h5f:
for dcnt in range(cnt):
h5f.create_dataset(f'ds_{dcnt:03}',data=arr)
print(f'Time to create {cnt} datasets={time.time()-start:.2f}')
根据您的评论和我们的讨论,最简单的实现可能是“等待”文件的函数,但不 return h5py 文件对象。这样您仍然可以使用标准上下文管理器:(例如,with h5py.File() as h5f:
)并且避免需要在主程序中关闭文件。
我将修改后的函数发布为新答案(重命名为 h5_wait
)以避免混淆(我的第一个答案具有原始函数 h5_open_wait
)。此功能类似,但 return 是一个 True/False
标志而不是 h5py 文件对象。它通过调用 h5py.File()
检查文件状态,然后在退出函数之前关闭。它还使用 sys.argv
获取 HDF5 文件名(如 sys.argv[1]
)。
查看下面的新代码:
import h5py
import sys
import time
def h5_wait(h5file):
wait = 3
max_wait = 30
waited = 0
while True:
try:
h5f = h5py.File(h5file,'r')
break
except FileNotFoundError:
print('\nError: HDF5 File not found\n')
return False
except OSError:
if waited < max_wait:
print(f'Warning: HDF5 File locked, sleeping {wait} seconds...')
time.sleep(wait)
waited += wait
else:
print(f'\nWaited too long= {waited} secs, exiting...\n')
return False
h5f.close()
return True
####################
if len(sys.argv) != 2:
sys.exit('Include HDF5 file name on command line.')
h5file = sys.argv[1]
h5stat = h5_wait(h5file)
if h5stat is False:
sys.exit('Error: HDF5 File not available')
with h5py.File(h5file) as h5f:
# do something with the file
start = time.time()
for ds, obj in h5f.items():
print(f'ds name={ds}; shape={obj.shape}')
print(f'\nTime to read {len(list(h5f.keys()))} datasets = {time.time()-start:.2f} secs')
数据将保存为hdf5文件,但保存一个文件总共需要大约30秒。一旦数据保存在一个 hdf5 文件中,该文件将立即使用,直到下一个 hdf5 文件完成,该过程将像这样继续。有没有一种简单的方法来检查 hd5 文件是否已完成加载,然后才能使用? hdf5 文件大约为 10-20MB,将全部保存在同一个文件夹中。当然,我也许可以将计时器设置为某种形式的 30 秒以上,但我有兴趣将时间保持得尽可能短,这意味着我需要确切地知道每个 hdf5 文件何时完成获取数据。
我有几个想法:
- 测量从一个时间点到另一个时间点的文件大小差异。如果没有变化,则认为文件已完成加载。
- 我不太了解 hdf5 文件,但也许每个 hdf5 文件的末尾都有一些东西,而且只有在末尾。如果是这样,我可以继续检查最后一个组件的值是否存在。如果存在,则文件必须完成。
有什么想法吗?如果有任何帮助,我将不胜感激。
编辑:
我对 hdf5 部分的想法 on_created
:
class CustomHandler(FileSystemEventHandler):
def __init__(self, callback: Callable):
self.callback = callback
# Store callback to be called on every on_created event
def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
#print(f"Event type: {event.event_type}\nAt: {event.src_path}\n")
# check if it's File creation, not Directory creation
if isinstance(event, FileCreatedEvent):
file = pathlib.Path(event.src_path)
#print(f"Processing file {file.name}\n")
# call callback
#self.callback(file)
wait = 3
max_wait = 30
waited = 0
while True:
try:
h5py.File(self.callback(file), 'r')
return self.callback(file)
except FileNotFoundError:
print('Error: HDF5 File not found')
return None
except OSError:
if waited < max_wait:
print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
time.sleep(wait)
waited += wait
else:
print(f'waited too long= {waited} secs')
return None
你要的是'file locking'。好消息:这在 HDF5 库构建中启用(默认情况下)。而且,更好的是,它在 h5py 包中启用!因此,如果您尝试打开一个由另一个程序打开以供写入的文件,您将得到一个异常。我们可以利用这个例外来发挥我们的优势。挑战在于将文件锁定异常与其他潜在的文件打开异常(如文件不存在)区分开来。
坦率地说,我更喜欢 Python 的 with/as:
上下文管理器来打开文件。但是,它以相同的方式处理所有异常(不打开并退出)。所以,我们需要一种方法来区别处理不同的异常。我怀疑自定义文件上下文管理器是执行此操作的最 Pythonic 方法。然而,这超出了我的专业范围。
相反,我写了一个你用文件名调用的函数它在 while
循环中使用 try/except:
打开文件。将发生以下 3 件事之一:
- 它 returns h5py 文件对象,如果它打开文件。
- 它立即returns
None
如果文件不存在。 - 如果它被锁定,它会休眠,然后重试。如果超过时间还打不开,返回None。
使用此功能时记得使用.close()
方法!
代码已于 2021-09-09 更新,以使用 argparse
模块将 HDF5 文件名作为必需的命令行参数传递。
更新了以下代码:
import h5py
import argparse
import sys
import time
def h5_open_wait(h5file):
wait = 3
max_wait = 30
waited = 0
while True:
try:
h5f = h5py.File(h5file,'r')
return h5f
except FileNotFoundError:
print('Error: HDF5 File not found')
return None
except OSError:
if waited < max_wait:
print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
time.sleep(wait)
waited += wait
else:
print(f'waited too long= {waited} secs')
return None
def get_job_options():
# Note that HDF5 file name is only parameter and is required;
parser = argparse.ArgumentParser(description='Check HDF5 file is available to open.')
parser.add_argument('hdf5', help='HDF5 filename (Required)' )
if len(sys.argv)==1:
# display help message when no args are passed.
parser.print_help()
sys.exit('Error: No HDF5 file name specified; exiting.')
args = parser.parse_args()
HDF5_FILE = args.hdf5
#print ('HDF5 file = %s' % args.hdf5)
return (HDF5_FILE)
####################
h5file = get_job_options()
start = time.time()
h5f = h5_open_wait(h5file)
if h5f is None:
sys.exit('Error: HDF5 File not opened')
# do something with the file
for ds, obj in h5f.items():
print(f'ds name={ds}; shape={obj.shape}')
h5f.close()
print(f'\nTime to read all datasets = {time.time()-start:.2f} secs')
为了测试,我写了一个简单的程序,从一个大数组中创建 800 个数据集。 (下面的代码。)要测试,首先启动它,然后 运行 上面的代码以查看它是如何等待的。根据您的系统速度调整上面的 max_wait
和下面的 a0
和 cnt
。
创建上面使用的示例文件的代码:
start = time.time()
a0 = 1000
cnt = 800
arr = np.random.random(a0*a0).reshape(a0,a0)
with h5py.File('SO_69067142.h5','w') as h5f:
for dcnt in range(cnt):
h5f.create_dataset(f'ds_{dcnt:03}',data=arr)
print(f'Time to create {cnt} datasets={time.time()-start:.2f}')
根据您的评论和我们的讨论,最简单的实现可能是“等待”文件的函数,但不 return h5py 文件对象。这样您仍然可以使用标准上下文管理器:(例如,with h5py.File() as h5f:
)并且避免需要在主程序中关闭文件。
我将修改后的函数发布为新答案(重命名为 h5_wait
)以避免混淆(我的第一个答案具有原始函数 h5_open_wait
)。此功能类似,但 return 是一个 True/False
标志而不是 h5py 文件对象。它通过调用 h5py.File()
检查文件状态,然后在退出函数之前关闭。它还使用 sys.argv
获取 HDF5 文件名(如 sys.argv[1]
)。
查看下面的新代码:
import h5py
import sys
import time
def h5_wait(h5file):
wait = 3
max_wait = 30
waited = 0
while True:
try:
h5f = h5py.File(h5file,'r')
break
except FileNotFoundError:
print('\nError: HDF5 File not found\n')
return False
except OSError:
if waited < max_wait:
print(f'Warning: HDF5 File locked, sleeping {wait} seconds...')
time.sleep(wait)
waited += wait
else:
print(f'\nWaited too long= {waited} secs, exiting...\n')
return False
h5f.close()
return True
####################
if len(sys.argv) != 2:
sys.exit('Include HDF5 file name on command line.')
h5file = sys.argv[1]
h5stat = h5_wait(h5file)
if h5stat is False:
sys.exit('Error: HDF5 File not available')
with h5py.File(h5file) as h5f:
# do something with the file
start = time.time()
for ds, obj in h5f.items():
print(f'ds name={ds}; shape={obj.shape}')
print(f'\nTime to read {len(list(h5f.keys()))} datasets = {time.time()-start:.2f} secs')